aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2020-09-17 10:29:32 +0800
committerGitHub <noreply@github.com>2020-09-17 10:29:32 +0800
commitf71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591 (patch)
treee0e98625edaab040eed21fd1a8b277f2c0546ad3 /weed/server
parent23baa3c36ce468a36d89abae59f4411cdc446043 (diff)
parent5eee4983f36f55a2a01381e8af278b28919dbe90 (diff)
downloadseaweedfs-f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591.tar.xz
seaweedfs-f71f7fcf99ba6d64bfa49fd7411e06bdc2b9d591.zip
Merge pull request #15 from chrislusf/master
sync
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go3
-rw-r--r--weed/server/filer_grpc_server_kv.go42
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go74
-rw-r--r--weed/server/filer_server.go6
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go56
-rw-r--r--weed/server/filer_ui/templates.go15
-rw-r--r--weed/server/master_grpc_server.go4
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server_handlers_admin.go2
-rw-r--r--weed/server/volume_grpc_admin.go10
-rw-r--r--weed/server/volume_grpc_client_to_master.go64
-rw-r--r--weed/server/volume_server.go14
-rw-r--r--weed/server/volume_server_handlers_read.go2
-rw-r--r--weed/server/webdav_server.go8
14 files changed, 234 insertions, 68 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index d3ced0a53..20c2502b9 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -314,7 +314,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
glog.V(4).Infof("DeleteEntry %v", req)
- err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, nil)
+ err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
resp = &filer_pb.DeleteEntryResponse{}
if err != nil {
resp.Error = err.Error()
@@ -426,6 +426,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
MaxMb: uint32(fs.option.MaxMB),
DirBuckets: fs.filer.DirBucketsPath,
Cipher: fs.filer.Cipher,
+ Signature: fs.filer.Signature,
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
diff --git a/weed/server/filer_grpc_server_kv.go b/weed/server/filer_grpc_server_kv.go
new file mode 100644
index 000000000..3cb47115e
--- /dev/null
+++ b/weed/server/filer_grpc_server_kv.go
@@ -0,0 +1,42 @@
+package weed_server
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (fs *FilerServer) KvGet(ctx context.Context, req *filer_pb.KvGetRequest) (*filer_pb.KvGetResponse, error) {
+
+ value, err := fs.filer.Store.KvGet(ctx, req.Key)
+ if err == filer.ErrKvNotFound {
+ return &filer_pb.KvGetResponse{}, nil
+ }
+
+ if err != nil {
+ return &filer_pb.KvGetResponse{Error: err.Error()}, nil
+ }
+
+ return &filer_pb.KvGetResponse{
+ Value: value,
+ }, nil
+
+}
+
+// KvPut sets the key~value. if empty value, delete the kv entry
+func (fs *FilerServer) KvPut(ctx context.Context, req *filer_pb.KvPutRequest) (*filer_pb.KvPutResponse, error) {
+
+ if len(req.Value) == 0 {
+ if err := fs.filer.Store.KvDelete(ctx, req.Key); err != nil {
+ return &filer_pb.KvPutResponse{Error: err.Error()}, nil
+ }
+ }
+
+ err := fs.filer.Store.KvPut(ctx, req.Key, req.Value)
+ if err != nil {
+ return &filer_pb.KvPutResponse{Error: err.Error()}, nil
+ }
+
+ return &filer_pb.KvPutResponse{}, nil
+
+}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 9ba45edfe..634fb5211 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -2,6 +2,7 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"strings"
"time"
@@ -24,7 +25,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
@@ -37,12 +38,21 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime = time.Unix(0, processedTsNs)
}
- err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
- fs.filer.MetaAggregator.ListenersLock.Lock()
- fs.filer.MetaAggregator.ListenersCond.Wait()
- fs.filer.MetaAggregator.ListenersLock.Unlock()
- return true
- }, eachLogEntryFn)
+ for {
+ lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.filer.MetaAggregator.ListenersLock.Lock()
+ fs.filer.MetaAggregator.ListenersCond.Wait()
+ fs.filer.MetaAggregator.ListenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+ if err != nil {
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
return err
@@ -59,30 +69,37 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
- if _, ok := fs.filer.Store.ActualStore.(filer.FilerLocalStore); ok {
- // println("reading from persisted logs ...")
- processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
- }
+ // println("reading from persisted logs ...")
+ processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
- glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
}
+ glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
// println("reading from in memory logs ...")
- err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
- fs.listenersLock.Lock()
- fs.listenersCond.Wait()
- fs.listenersLock.Unlock()
- return true
- }, eachLogEntryFn)
+ for {
+ lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.listenersLock.Lock()
+ fs.listenersCond.Wait()
+ fs.listenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+ if err != nil {
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
return err
@@ -104,13 +121,20 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati
}
}
-func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ foundSelf := false
for _, sig := range eventNotification.Signatures {
if sig == clientSignature && clientSignature != 0 {
return nil
}
+ if sig == fs.filer.Signature {
+ foundSelf = true
+ }
+ }
+ if !foundSelf {
+ eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature)
}
// get complete path to the file or directory
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 160ea5a6d..18809162a 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -20,6 +20,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
_ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
@@ -156,10 +157,7 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
if metricsAddress == "" && metricsIntervalSec <= 0 {
return
}
- go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
- func() (addr string, intervalSeconds int) {
- return metricsAddress, metricsIntervalSec
- })
+ go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, metricsAddress, metricsIntervalSec)
}
func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 0f6176356..61011fc20 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"crypto/md5"
+ "fmt"
"hash"
"io"
"io/ioutil"
@@ -46,7 +47,11 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
var err error
var md5bytes []byte
if r.Method == "POST" {
- reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
+ if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
+ reply, err = fs.mkdir(ctx, w, r)
+ } else {
+ reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
+ }
} else {
reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
}
@@ -254,3 +259,52 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe
return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
}
}
+
+func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http.Request) (filerResult *FilerPostResult, replyerr error) {
+
+ // detect file mode
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
+ }
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
+ }
+
+ // fix the path
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ path = path[:len(path)-1]
+ }
+
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if err == nil && existingEntry != nil {
+ replyerr = fmt.Errorf("dir %s already exists", path)
+ return
+ }
+
+ glog.V(4).Infoln("mkdir", path)
+ entry := &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode) | os.ModeDir,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+
+ filerResult = &FilerPostResult{
+ Name: util.FullPath(path).Name(),
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ replyerr = dbErr
+ filerResult.Error = dbErr.Error()
+ glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr)
+ }
+ return filerResult, replyerr
+}
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index e532b27e2..04a81433b 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -3,10 +3,19 @@ package master_ui
import (
"github.com/dustin/go-humanize"
"html/template"
+ "net/url"
+ "strings"
)
+func printpath(parts ...string) string {
+ concat := strings.Join(parts, "")
+ escaped := url.PathEscape(concat)
+ return strings.ReplaceAll(escaped, "%2F", "/")
+}
+
var funcMap = template.FuncMap{
"humanizeBytes": humanize.Bytes,
+ "printpath": printpath,
}
var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html>
@@ -50,7 +59,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div>
{{ range $entry := .Breadcrumbs }}
- <a href="{{ $entry.Link }}" >
+ <a href="{{ printpath $entry.Link }}" >
{{ $entry.Name }}
</a>
{{ end }}
@@ -69,11 +78,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<td>
{{if $entry.IsDirectory}}
<img src="/seaweedfsstatic/images/folder.gif" width="20" height="23">
- <a href={{ print $path "/" $entry.Name "/"}} >
+ <a href="{{ printpath $path "/" $entry.Name "/"}}" >
{{ $entry.Name }}
</a>
{{else}}
- <a href={{ print $path "/" $entry.Name }} >
+ <a href="{{ printpath $path "/" $entry.Name }}" >
{{ $entry.Name }}
</a>
{{end}}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 108892f92..93ecefb74 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -12,7 +12,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
@@ -73,9 +72,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
- StorageBackends: backend.ToPbStorageBackends(),
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 282c75679..168975fb6 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/raft"
@@ -184,6 +185,7 @@ func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_
resp := &master_pb.GetMasterConfigurationResponse{
MetricsAddress: ms.option.MetricsAddress,
MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
}
return resp, nil
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 7595c0171..34235384f 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -110,7 +110,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
} else {
url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
}
- http.Redirect(w, r, url, http.StatusMovedPermanently)
+ http.Redirect(w, r, url, http.StatusPermanentRedirect)
} else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index f81ec649d..9296c63e9 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -196,6 +196,16 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv
}
+func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
+
+ resp := &volume_server_pb.VolumeServerLeaveResponse{}
+
+ vs.StopHeartbeat()
+
+ return resp, nil
+
+}
+
func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
resp := &volume_server_pb.VolumeNeedleStatusResponse{}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 3a1b95f26..0c0cc39c1 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,6 +2,7 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"time"
"google.golang.org/grpc"
@@ -21,6 +22,27 @@ import (
func (vs *VolumeServer) GetMaster() string {
return vs.currentMaster
}
+
+func (vs *VolumeServer) checkWithMaster() (err error) {
+ for _, master := range vs.SeedMasterNodes {
+ err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ vs.MetricsAddress, vs.MetricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ backend.LoadFromPbStorageBackends(resp.StorageBackends)
+ return nil
+ })
+ if err == nil {
+ return
+ } else {
+ glog.V(0).Infof("checkWithMaster %s: %v", master, err)
+ }
+ }
+ return
+}
+
func (vs *VolumeServer) heartbeat() {
glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes)
@@ -31,7 +53,7 @@ func (vs *VolumeServer) heartbeat() {
var err error
var newLeader string
- for {
+ for vs.isHeartbeating {
for _, master := range vs.SeedMasterNodes {
if newLeader != "" {
// the new leader may actually is the same master
@@ -52,20 +74,35 @@ func (vs *VolumeServer) heartbeat() {
newLeader = ""
vs.store.MasterAddress = ""
}
+ if !vs.isHeartbeating {
+ break
+ }
}
}
}
+func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
+ if !vs.isHeartbeating {
+ return true
+ }
+ vs.isHeartbeating = false
+ vs.stopChan <- true
+ return false
+}
+
func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
defer grpcConection.Close()
client := master_pb.NewSeaweedClient(grpcConection)
- stream, err := client.SendHeartbeat(context.Background())
+ stream, err := client.SendHeartbeat(ctx)
if err != nil {
glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
return "", err
@@ -96,13 +133,6 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
doneChan <- nil
return
}
- if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() {
- vs.MetricsAddress = in.GetMetricsAddress()
- vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
- }
- if len(in.StorageBackends) > 0 {
- backend.LoadFromPbStorageBackends(in.StorageBackends)
- }
}
}()
@@ -168,14 +198,10 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
return "", err
}
case <-volumeTickChan:
- if vs.SendHeartbeat {
- glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
- if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
- return "", err
- }
- } else {
- glog.V(4).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port)
+ glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
+ if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ return "", err
}
case <-ecShardTickChan:
glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
@@ -185,6 +211,8 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
case err = <-doneChan:
return
+ case <-vs.stopChan:
+ return
}
}
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 6612e9045..c600da21e 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -31,7 +31,8 @@ type VolumeServer struct {
MetricsAddress string
MetricsIntervalSec int
fileSizeLimitBytes int64
- SendHeartbeat bool
+ isHeartbeating bool
+ stopChan chan bool
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -67,9 +68,13 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
- SendHeartbeat: true,
+ isHeartbeating: true,
+ stopChan: make(chan bool),
}
vs.SeedMasterNodes = masterNodes
+
+ vs.checkWithMaster()
+
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
@@ -93,10 +98,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
go vs.heartbeat()
hostAddress := fmt.Sprintf("%s:%d", ip, port)
- go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather,
- func() (addr string, intervalSeconds int) {
- return vs.MetricsAddress, vs.MetricsIntervalSec
- })
+ go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather, vs.MetricsAddress, vs.MetricsIntervalSec)
return vs
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 07289e880..bb04678d6 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -95,7 +95,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
// glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
- glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
+ glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
return
}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 57723ab0b..121c0d2bb 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -259,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string)
dir, name := util.FullPath(fullFilePath).DirAndName()
- return filer_pb.Remove(fs, dir, name, true, false, false, false, fs.signature)
+ return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
}
@@ -480,7 +480,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
f.reader = nil
}
if f.reader == nil {
- chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32)
+ chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
}
@@ -552,9 +552,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
- case 0:
+ case io.SeekStart:
f.off = 0
- case 2:
+ case io.SeekEnd:
if fi, err := f.fs.stat(ctx, f.name); err != nil {
return 0, err
} else {