diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-09-17 10:29:32 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-09-17 10:29:32 +0800 |
| commit | f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591 (patch) | |
| tree | e0e98625edaab040eed21fd1a8b277f2c0546ad3 /weed/server | |
| parent | 23baa3c36ce468a36d89abae59f4411cdc446043 (diff) | |
| parent | 5eee4983f36f55a2a01381e8af278b28919dbe90 (diff) | |
| download | seaweedfs-f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591.tar.xz seaweedfs-f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591.zip | |
Merge pull request #15 from chrislusf/master
sync
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 3 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_kv.go | 42 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 74 | ||||
| -rw-r--r-- | weed/server/filer_server.go | 6 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 56 | ||||
| -rw-r--r-- | weed/server/filer_ui/templates.go | 15 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 4 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 2 | ||||
| -rw-r--r-- | weed/server/master_server_handlers_admin.go | 2 | ||||
| -rw-r--r-- | weed/server/volume_grpc_admin.go | 10 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 64 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 14 | ||||
| -rw-r--r-- | weed/server/volume_server_handlers_read.go | 2 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 8 |
14 files changed, 234 insertions, 68 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index d3ced0a53..20c2502b9 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -314,7 +314,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr glog.V(4).Infof("DeleteEntry %v", req) - err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, nil) + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures) resp = &filer_pb.DeleteEntryResponse{} if err != nil { resp.Error = err.Error() @@ -426,6 +426,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. MaxMb: uint32(fs.option.MaxMB), DirBuckets: fs.filer.DirBucketsPath, Cipher: fs.filer.Cipher, + Signature: fs.filer.Signature, } glog.V(4).Infof("GetFilerConfiguration: %v", t) diff --git a/weed/server/filer_grpc_server_kv.go b/weed/server/filer_grpc_server_kv.go new file mode 100644 index 000000000..3cb47115e --- /dev/null +++ b/weed/server/filer_grpc_server_kv.go @@ -0,0 +1,42 @@ +package weed_server + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (fs *FilerServer) KvGet(ctx context.Context, req *filer_pb.KvGetRequest) (*filer_pb.KvGetResponse, error) { + + value, err := fs.filer.Store.KvGet(ctx, req.Key) + if err == filer.ErrKvNotFound { + return &filer_pb.KvGetResponse{}, nil + } + + if err != nil { + return &filer_pb.KvGetResponse{Error: err.Error()}, nil + } + + return &filer_pb.KvGetResponse{ + Value: value, + }, nil + +} + +// KvPut sets the key~value. if empty value, delete the kv entry +func (fs *FilerServer) KvPut(ctx context.Context, req *filer_pb.KvPutRequest) (*filer_pb.KvPutResponse, error) { + + if len(req.Value) == 0 { + if err := fs.filer.Store.KvDelete(ctx, req.Key); err != nil { + return &filer_pb.KvPutResponse{Error: err.Error()}, nil + } + } + + err := fs.filer.Store.KvPut(ctx, req.Key, req.Value) + if err != nil { + return &filer_pb.KvPutResponse{Error: err.Error()}, nil + } + + return &filer_pb.KvPutResponse{}, nil + +} diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 9ba45edfe..634fb5211 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "strings" "time" @@ -24,7 +25,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -37,12 +38,21 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime = time.Unix(0, processedTsNs) } - err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.filer.MetaAggregator.ListenersLock.Lock() - fs.filer.MetaAggregator.ListenersCond.Wait() - fs.filer.MetaAggregator.ListenersLock.Unlock() - return true - }, eachLogEntryFn) + for { + lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.filer.MetaAggregator.ListenersLock.Lock() + fs.filer.MetaAggregator.ListenersCond.Wait() + fs.filer.MetaAggregator.ListenersLock.Unlock() + return true + }, eachLogEntryFn) + if err != nil { + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } return err @@ -59,30 +69,37 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) - if _, ok := fs.filer.Store.ActualStore.(filer.FilerLocalStore); ok { - // println("reading from persisted logs ...") - processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } + // println("reading from persisted logs ...") + processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) } + glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) // println("reading from in memory logs ...") - err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.listenersLock.Lock() - fs.listenersCond.Wait() - fs.listenersLock.Unlock() - return true - }, eachLogEntryFn) + for { + lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) + if err != nil { + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } return err @@ -104,13 +121,20 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + foundSelf := false for _, sig := range eventNotification.Signatures { if sig == clientSignature && clientSignature != 0 { return nil } + if sig == fs.filer.Signature { + foundSelf = true + } + } + if !foundSelf { + eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature) } // get complete path to the file or directory diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 160ea5a6d..18809162a 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -20,6 +20,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" @@ -156,10 +157,7 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { if metricsAddress == "" && metricsIntervalSec <= 0 { return } - go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, - func() (addr string, intervalSeconds int) { - return metricsAddress, metricsIntervalSec - }) + go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, metricsAddress, metricsIntervalSec) } func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 0f6176356..61011fc20 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "crypto/md5" + "fmt" "hash" "io" "io/ioutil" @@ -46,7 +47,11 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * var err error var md5bytes []byte if r.Method == "POST" { - reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") { + reply, err = fs.mkdir(ctx, w, r) + } else { + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + } } else { reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) } @@ -254,3 +259,52 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil } } + +func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http.Request) (filerResult *FilerPostResult, replyerr error) { + + // detect file mode + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 + } + + // fix the path + path := r.URL.Path + if strings.HasSuffix(path, "/") { + path = path[:len(path)-1] + } + + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + if err == nil && existingEntry != nil { + replyerr = fmt.Errorf("dir %s already exists", path) + return + } + + glog.V(4).Infoln("mkdir", path) + entry := &filer.Entry{ + FullPath: util.FullPath(path), + Attr: filer.Attr{ + Mtime: time.Now(), + Crtime: time.Now(), + Mode: os.FileMode(mode) | os.ModeDir, + Uid: OS_UID, + Gid: OS_GID, + }, + } + + filerResult = &FilerPostResult{ + Name: util.FullPath(path).Name(), + } + + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { + replyerr = dbErr + filerResult.Error = dbErr.Error() + glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr) + } + return filerResult, replyerr +} diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go index e532b27e2..04a81433b 100644 --- a/weed/server/filer_ui/templates.go +++ b/weed/server/filer_ui/templates.go @@ -3,10 +3,19 @@ package master_ui import ( "github.com/dustin/go-humanize" "html/template" + "net/url" + "strings" ) +func printpath(parts ...string) string { + concat := strings.Join(parts, "") + escaped := url.PathEscape(concat) + return strings.ReplaceAll(escaped, "%2F", "/") +} + var funcMap = template.FuncMap{ "humanizeBytes": humanize.Bytes, + "printpath": printpath, } var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html> @@ -50,7 +59,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <div class="row"> <div> {{ range $entry := .Breadcrumbs }} - <a href="{{ $entry.Link }}" > + <a href="{{ printpath $entry.Link }}" > {{ $entry.Name }} </a> {{ end }} @@ -69,11 +78,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <td> {{if $entry.IsDirectory}} <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23"> - <a href={{ print $path "/" $entry.Name "/"}} > + <a href="{{ printpath $path "/" $entry.Name "/"}}" > {{ $entry.Name }} </a> {{else}} - <a href={{ print $path "/" $entry.Name }} > + <a href="{{ printpath $path "/" $entry.Name }}" > {{ $entry.Name }} </a> {{end}} diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 108892f92..93ecefb74 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -12,7 +12,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -73,9 +72,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, - MetricsAddress: ms.option.MetricsAddress, - MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), - StorageBackends: backend.ToPbStorageBackends(), }); err != nil { glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err) return err diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 282c75679..168975fb6 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/raft" @@ -184,6 +185,7 @@ func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_ resp := &master_pb.GetMasterConfigurationResponse{ MetricsAddress: ms.option.MetricsAddress, MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + StorageBackends: backend.ToPbStorageBackends(), } return resp, nil diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 7595c0171..34235384f 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -110,7 +110,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) } else { url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path } - http.Redirect(w, r, url, http.StatusMovedPermanently) + http.Redirect(w, r, url, http.StatusPermanentRedirect) } else { writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error)) } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index f81ec649d..9296c63e9 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -196,6 +196,16 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv } +func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) { + + resp := &volume_server_pb.VolumeServerLeaveResponse{} + + vs.StopHeartbeat() + + return resp, nil + +} + func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) { resp := &volume_server_pb.VolumeNeedleStatusResponse{} diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 3a1b95f26..0c0cc39c1 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" "time" "google.golang.org/grpc" @@ -21,6 +22,27 @@ import ( func (vs *VolumeServer) GetMaster() string { return vs.currentMaster } + +func (vs *VolumeServer) checkWithMaster() (err error) { + for _, master := range vs.SeedMasterNodes { + err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", master, err) + } + vs.MetricsAddress, vs.MetricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + backend.LoadFromPbStorageBackends(resp.StorageBackends) + return nil + }) + if err == nil { + return + } else { + glog.V(0).Infof("checkWithMaster %s: %v", master, err) + } + } + return +} + func (vs *VolumeServer) heartbeat() { glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes) @@ -31,7 +53,7 @@ func (vs *VolumeServer) heartbeat() { var err error var newLeader string - for { + for vs.isHeartbeating { for _, master := range vs.SeedMasterNodes { if newLeader != "" { // the new leader may actually is the same master @@ -52,20 +74,35 @@ func (vs *VolumeServer) heartbeat() { newLeader = "" vs.store.MasterAddress = "" } + if !vs.isHeartbeating { + break + } } } } +func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) { + if !vs.isHeartbeating { + return true + } + vs.isHeartbeating = false + vs.stopChan <- true + return false +} + func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) - stream, err := client.SendHeartbeat(context.Background()) + stream, err := client.SendHeartbeat(ctx) if err != nil { glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) return "", err @@ -96,13 +133,6 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi doneChan <- nil return } - if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() { - vs.MetricsAddress = in.GetMetricsAddress() - vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds()) - } - if len(in.StorageBackends) > 0 { - backend.LoadFromPbStorageBackends(in.StorageBackends) - } } }() @@ -168,14 +198,10 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi return "", err } case <-volumeTickChan: - if vs.SendHeartbeat { - glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) - if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) - return "", err - } - } else { - glog.V(4).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port) + glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return "", err } case <-ecShardTickChan: glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) @@ -185,6 +211,8 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } case err = <-doneChan: return + case <-vs.stopChan: + return } } } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 6612e9045..c600da21e 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -31,7 +31,8 @@ type VolumeServer struct { MetricsAddress string MetricsIntervalSec int fileSizeLimitBytes int64 - SendHeartbeat bool + isHeartbeating bool + stopChan chan bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -67,9 +68,13 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, - SendHeartbeat: true, + isHeartbeating: true, + stopChan: make(chan bool), } vs.SeedMasterNodes = masterNodes + + vs.checkWithMaster() + vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) @@ -93,10 +98,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, go vs.heartbeat() hostAddress := fmt.Sprintf("%s:%d", ip, port) - go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather, - func() (addr string, intervalSeconds int) { - return vs.MetricsAddress, vs.MetricsIntervalSec - }) + go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather, vs.MetricsAddress, vs.MetricsIntervalSec) return vs } diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 07289e880..bb04678d6 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -95,7 +95,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } // glog.V(4).Infoln("read bytes", count, "error", err) if err != nil || count < 0 { - glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err) + glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err) w.WriteHeader(http.StatusNotFound) return } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 57723ab0b..121c0d2bb 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -259,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) dir, name := util.FullPath(fullFilePath).DirAndName() - return filer_pb.Remove(fs, dir, name, true, false, false, false, fs.signature) + return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature}) } @@ -480,7 +480,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { f.reader = nil } if f.reader == nil { - chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32) + chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64) f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize) } @@ -552,9 +552,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) { var err error switch whence { - case 0: + case io.SeekStart: f.off = 0 - case 2: + case io.SeekEnd: if fi, err := f.fs.stat(ctx, f.name); err != nil { return 0, err } else { |
