diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 23 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_rename.go | 4 | ||||
| -rw-r--r-- | weed/server/filer_grpc_server_sub_meta.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 2 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_autochunk.go | 3 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_cipher.go | 2 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 2 | ||||
| -rw-r--r-- | weed/server/master_server.go | 3 | ||||
| -rw-r--r-- | weed/server/volume_grpc_client_to_master.go | 14 | ||||
| -rw-r--r-- | weed/server/volume_server.go | 4 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 11 |
11 files changed, 48 insertions, 32 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 48e9253f0..405742e1e 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -171,7 +171,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr FullPath: util.JoinPath(req.Directory, req.Entry.Name), Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), Chunks: chunks, - }, req.OExcl, req.IsFromOtherCluster) + }, req.OExcl, req.IsFromOtherCluster, req.Signatures) if createErr == nil { fs.filer.DeleteChunks(garbage) @@ -235,28 +235,26 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } - fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster) + fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures) return &filer_pb.UpdateEntryResponse{}, err } func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { - chunks = newEntry.Chunks // remove old chunks if not included in the new ones if existingEntry != nil { garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks) if err != nil { - return chunks, nil, fmt.Errorf("MinusChunks: %v", err) + return newEntry.Chunks, nil, fmt.Errorf("MinusChunks: %v", err) } } // files with manifest chunks are usually large and append only, skip calculating covered chunks - var coveredChunks []*filer_pb.FileChunk - if !filer2.HasChunkManifest(newEntry.Chunks) { - chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks) - garbage = append(garbage, coveredChunks...) - } + manifestChunks, nonManifestChunks := filer2.SeparateManifestChunks(newEntry.Chunks) + + chunks, coveredChunks := filer2.CompactFileChunks(fs.lookupFileId, nonManifestChunks) + garbage = append(garbage, coveredChunks...) chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( newEntry.Attributes.Replication, @@ -268,6 +266,9 @@ func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *file // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", err) } + + chunks = append(chunks, manifestChunks...) + return } @@ -311,7 +312,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo glog.V(0).Infof("MaybeManifestize: %v", err) } - err = fs.filer.CreateEntry(context.Background(), entry, false, false) + err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil) return &filer_pb.AppendToEntryResponse{}, err } @@ -320,7 +321,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr glog.V(4).Infof("DeleteEntry %v", req) - err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster) + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, nil) resp = &filer_pb.DeleteEntryResponse{} if err != nil { resp.Error = err.Error() diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 9642fec24..cbb71682c 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -110,7 +110,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat Attr: entry.Attr, Chunks: entry.Chunks, } - createErr := fs.filer.CreateEntry(ctx, newEntry, false, false) + createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil) if createErr != nil { return createErr } @@ -124,7 +124,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } // delete old entry - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false) + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil) if deleteErr != nil { return deleteErr } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 4341f2091..2ad12e9c8 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -24,7 +24,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -59,7 +59,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -104,9 +104,15 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + for _, sig := range eventNotification.Signatures { + if sig == clientSignature && clientSignature != 0 { + return nil + } + } + // get complete path to the file or directory var entryName string if eventNotification.OldEntry != nil { diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index d22376a45..0091ae3ce 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -111,7 +111,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { objectPath = objectPath[0 : len(objectPath)-1] } - err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil) if err != nil { glog.V(1).Infoln("deleting", objectPath, ":", err.Error()) httpStatus := http.StatusInternalServerError diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 0365ea3ab..1d037f85f 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -148,7 +148,6 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa crTime = existingEntry.Crtime } - glog.V(4).Infoln("saving", path) entry := &filer2.Entry{ FullPath: util.FullPath(path), @@ -172,7 +171,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Size: chunkOffset, } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) replyerr = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 6ec06d3de..670399425 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -80,7 +80,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Size: int64(pu.OriginalDataSize), } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) err = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 579b30a6a..d310a27d4 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -87,7 +87,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dn.UpAdjustMaxVolumeCountDelta(delta) } - glog.V(5).Infof("master received heartbeat %s", heartbeat.String()) + glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ Url: dn.Url(), PublicUrl: dn.PublicUrl, diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 377fac26f..ae59636ad 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -7,7 +7,6 @@ import ( "net/url" "os" "regexp" - "strconv" "strings" "sync" "time" @@ -210,7 +209,7 @@ func (ms *MasterServer) startAdminScripts() { scriptLines = append(scriptLines, "unlock") } - masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) + masterAddress := fmt.Sprintf("%s:%d",ms.option.Host, ms.option.Port) var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index c62a4a388..3a1b95f26 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -168,13 +168,17 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi return "", err } case <-volumeTickChan: - glog.V(5).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) - if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) - return "", err + if vs.SendHeartbeat { + glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + return "", err + } + } else { + glog.V(4).Infof("volume server %s:%d skip send heartbeat", vs.store.Ip, vs.store.Port) } case <-ecShardTickChan: - glog.V(5).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) + glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) return "", err diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 3af37b491..6612e9045 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -31,6 +31,7 @@ type VolumeServer struct { MetricsAddress string MetricsIntervalSec int fileSizeLimitBytes int64 + SendHeartbeat bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -66,16 +67,17 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, + SendHeartbeat: true, } vs.SeedMasterNodes = masterNodes vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) + adminMux.HandleFunc("/status", vs.statusHandler) if signingKey == "" || enableUiAccess { // only expose the volume server details for safe environments adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) - adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) /* adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 3d2629c19..f06189e34 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -70,6 +70,7 @@ type WebDavFileSystem struct { filer *filer2.Filer grpcDialOption grpc.DialOption chunkCache *chunk_cache.TieredChunkCache + signature int32 } type FileInfo struct { @@ -103,6 +104,7 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { return &WebDavFileSystem{ option: option, chunkCache: chunkCache, + signature: util.RandomInt32(), }, nil } @@ -165,6 +167,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm Gid: fs.option.Gid, }, }, + Signatures: []int32{fs.signature}, } glog.V(1).Infof("mkdir: %v", request) @@ -216,6 +219,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f TtlSec: 0, }, }, + Signatures: []int32{fs.signature}, }); err != nil { return fmt.Errorf("create %s: %v", fullFilePath, err) } @@ -255,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) dir, name := util.FullPath(fullFilePath).DirAndName() - return filer_pb.Remove(fs, dir, name, true, false, false, false) + return filer_pb.Remove(fs, dir, name, true, false, false, false, fs.signature) } @@ -422,8 +426,9 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Attributes.Replication = replication request := &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: f.entry, + Directory: dir, + Entry: f.entry, + Signatures: []int32{f.fs.signature}, } if _, err := client.UpdateEntry(ctx, request); err != nil { |
