diff options
| author | ustuzhanin <55892859+ustuzhanin@users.noreply.github.com> | 2020-10-02 22:47:25 +0500 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-10-02 22:47:25 +0500 |
| commit | 3e0a79ef050dba9e5347d20537ef562cc4b30b62 (patch) | |
| tree | e0b42e531d18136d9e272258187a305690ee2b4d /weed/server | |
| parent | cbd80253e33688f55c02dd29c994a3ee6eac3d6c (diff) | |
| parent | 9ab98fa912814686b3035a97b5173c1628fbc0fc (diff) | |
| download | seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.tar.xz seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.zip | |
Merge pull request #1 from chrislusf/master
Merge upstream
Diffstat (limited to 'weed/server')
29 files changed, 579 insertions, 529 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 48e9253f0..ecd23413f 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -6,10 +6,9 @@ import ( "os" "path/filepath" "strconv" - "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -35,9 +34,11 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L Entry: &filer_pb.Entry{ Name: req.Name, IsDirectory: entry.IsDirectory(), - Attributes: filer2.EntryAttributeToPb(entry), + Attributes: filer.EntryAttributeToPb(entry), Chunks: entry.Chunks, Extended: entry.Extended, + HardLinkId: entry.HardLinkId, + HardLinkCounter: entry.HardLinkCounter, }, }, nil } @@ -51,7 +52,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file limit = fs.option.DirListingLimit } - paginationLimit := filer2.PaginationSize + paginationLimit := filer.PaginationSize if limit < paginationLimit { paginationLimit = limit } @@ -59,7 +60,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) + entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix) if err != nil { return err @@ -74,19 +75,15 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName = entry.Name() - if req.Prefix != "" { - if !strings.HasPrefix(entry.Name(), req.Prefix) { - continue - } - } - if err := stream.Send(&filer_pb.ListEntriesResponse{ Entry: &filer_pb.Entry{ Name: entry.Name(), IsDirectory: entry.IsDirectory(), Chunks: entry.Chunks, - Attributes: filer2.EntryAttributeToPb(entry), + Attributes: filer.EntryAttributeToPb(entry), Extended: entry.Extended, + HardLinkId: entry.HardLinkId, + HardLinkCounter: entry.HardLinkCounter, }, }); err != nil { return err @@ -161,17 +158,14 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) } - if req.Entry.Attributes == nil { - glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name)) - resp.Error = fmt.Sprintf("can not create entry with empty attributes") - return - } - - createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{ + createErr := fs.filer.CreateEntry(ctx, &filer.Entry{ FullPath: util.JoinPath(req.Directory, req.Entry.Name), - Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), + Attr: filer.PbToEntryAttribute(req.Entry.Attributes), Chunks: chunks, - }, req.OExcl, req.IsFromOtherCluster) + Extended: req.Entry.Extended, + HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), + HardLinkCounter: req.Entry.HardLinkCounter, + }, req.OExcl, req.IsFromOtherCluster, req.Signatures) if createErr == nil { fs.filer.DeleteChunks(garbage) @@ -198,11 +192,13 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) } - newEntry := &filer2.Entry{ + newEntry := &filer.Entry{ FullPath: util.JoinPath(req.Directory, req.Entry.Name), Attr: entry.Attr, Extended: req.Entry.Extended, Chunks: chunks, + HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), + HardLinkCounter: req.Entry.HardLinkCounter, } glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v", @@ -225,49 +221,53 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } - if filer2.EqualEntry(entry, newEntry) { + if filer.EqualEntry(entry, newEntry) { return &filer_pb.UpdateEntryResponse{}, err } if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { fs.filer.DeleteChunks(garbage) + + fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures) + } else { glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } - fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster) - return &filer_pb.UpdateEntryResponse{}, err } -func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { - chunks = newEntry.Chunks +func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { // remove old chunks if not included in the new ones if existingEntry != nil { - garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks) + garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks) if err != nil { - return chunks, nil, fmt.Errorf("MinusChunks: %v", err) + return newEntry.Chunks, nil, fmt.Errorf("MinusChunks: %v", err) } } // files with manifest chunks are usually large and append only, skip calculating covered chunks - var coveredChunks []*filer_pb.FileChunk - if !filer2.HasChunkManifest(newEntry.Chunks) { - chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks) - garbage = append(garbage, coveredChunks...) + manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.Chunks) + + chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks) + garbage = append(garbage, coveredChunks...) + + if newEntry.Attributes != nil { + chunks, err = filer.MaybeManifestize(fs.saveAsChunk( + newEntry.Attributes.Replication, + newEntry.Attributes.Collection, + "", + needle.SecondsToTTL(newEntry.Attributes.TtlSec), + false), chunks) + if err != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", err) + } } - chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( - newEntry.Attributes.Replication, - newEntry.Attributes.Collection, - "", - needle.SecondsToTTL(newEntry.Attributes.TtlSec), - false), chunks) - if err != nil { - // not good, but should be ok - glog.V(0).Infof("MaybeManifestize: %v", err) - } + chunks = append(chunks, manifestChunks...) + return } @@ -279,9 +279,9 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo var offset int64 = 0 entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath)) if err == filer_pb.ErrNotFound { - entry = &filer2.Entry{ + entry = &filer.Entry{ FullPath: fullpath, - Attr: filer2.Attr{ + Attr: filer.Attr{ Crtime: time.Now(), Mtime: time.Now(), Mode: os.FileMode(0644), @@ -290,7 +290,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo }, } } else { - offset = int64(filer2.TotalSize(entry.Chunks)) + offset = int64(filer.TotalSize(entry.Chunks)) } for _, chunk := range req.Chunks { @@ -300,7 +300,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo entry.Chunks = append(entry.Chunks, req.Chunks...) - entry.Chunks, err = filer2.MaybeManifestize(fs.saveAsChunk( + entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk( entry.Replication, entry.Collection, "", @@ -311,7 +311,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo glog.V(0).Infof("MaybeManifestize: %v", err) } - err = fs.filer.CreateEntry(context.Background(), entry, false, false) + err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil) return &filer_pb.AppendToEntryResponse{}, err } @@ -320,7 +320,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr glog.V(4).Infof("DeleteEntry %v", req) - err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster) + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures) resp = &filer_pb.DeleteEntryResponse{} if err != nil { resp.Error = err.Error() @@ -426,12 +426,15 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { t := &filer_pb.GetFilerConfigurationResponse{ - Masters: fs.option.Masters, - Collection: fs.option.Collection, - Replication: fs.option.DefaultReplication, - MaxMb: uint32(fs.option.MaxMB), - DirBuckets: fs.filer.DirBucketsPath, - Cipher: fs.filer.Cipher, + Masters: fs.option.Masters, + Collection: fs.option.Collection, + Replication: fs.option.DefaultReplication, + MaxMb: uint32(fs.option.MaxMB), + DirBuckets: fs.filer.DirBucketsPath, + Cipher: fs.filer.Cipher, + Signature: fs.filer.Signature, + MetricsAddress: fs.metricsAddress, + MetricsIntervalSec: int32(fs.metricsIntervalSec), } glog.V(4).Infof("GetFilerConfiguration: %v", t) diff --git a/weed/server/filer_grpc_server_kv.go b/weed/server/filer_grpc_server_kv.go new file mode 100644 index 000000000..3cb47115e --- /dev/null +++ b/weed/server/filer_grpc_server_kv.go @@ -0,0 +1,42 @@ +package weed_server + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (fs *FilerServer) KvGet(ctx context.Context, req *filer_pb.KvGetRequest) (*filer_pb.KvGetResponse, error) { + + value, err := fs.filer.Store.KvGet(ctx, req.Key) + if err == filer.ErrKvNotFound { + return &filer_pb.KvGetResponse{}, nil + } + + if err != nil { + return &filer_pb.KvGetResponse{Error: err.Error()}, nil + } + + return &filer_pb.KvGetResponse{ + Value: value, + }, nil + +} + +// KvPut sets the key~value. if empty value, delete the kv entry +func (fs *FilerServer) KvPut(ctx context.Context, req *filer_pb.KvPutRequest) (*filer_pb.KvPutResponse, error) { + + if len(req.Value) == 0 { + if err := fs.filer.Store.KvDelete(ctx, req.Key); err != nil { + return &filer_pb.KvPutResponse{Error: err.Error()}, nil + } + } + + err := fs.filer.Store.KvPut(ctx, req.Key, req.Value) + if err != nil { + return &filer_pb.KvPutResponse{Error: err.Error()}, nil + } + + return &filer_pb.KvPutResponse{}, nil + +} diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 9642fec24..f9ddeb600 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -5,7 +5,7 @@ import ( "fmt" "path/filepath" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -43,7 +43,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return &filer_pb.AtomicRenameEntryResponse{}, nil } -func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error { if entry.IsDirectory() { @@ -59,7 +59,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -70,7 +70,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. includeLastFile := false for { - entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024) + entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "") if err != nil { return err } @@ -92,7 +92,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents, +func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents, moveFolderSubEntries func() error) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -105,12 +105,13 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } // add to new directory - newEntry := &filer2.Entry{ + newEntry := &filer.Entry{ FullPath: newPath, Attr: entry.Attr, Chunks: entry.Chunks, + Extended: entry.Extended, } - createErr := fs.filer.CreateEntry(ctx, newEntry, false, false) + createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil) if createErr != nil { return createErr } @@ -124,7 +125,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } // delete old entry - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false) + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil) if deleteErr != nil { return deleteErr } @@ -136,6 +137,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } type MoveEvents struct { - oldEntries []*filer2.Entry - newEntries []*filer2.Entry + oldEntries []*filer.Entry + newEntries []*filer.Entry } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 4341f2091..634fb5211 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,12 +2,13 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "strings" "time" "github.com/golang/protobuf/proto" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -24,7 +25,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) @@ -37,12 +38,21 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime = time.Unix(0, processedTsNs) } - err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.filer.MetaAggregator.ListenersLock.Lock() - fs.filer.MetaAggregator.ListenersCond.Wait() - fs.filer.MetaAggregator.ListenersLock.Unlock() - return true - }, eachLogEntryFn) + for { + lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.filer.MetaAggregator.ListenersLock.Lock() + fs.filer.MetaAggregator.ListenersCond.Wait() + fs.filer.MetaAggregator.ListenersLock.Unlock() + return true + }, eachLogEntryFn) + if err != nil { + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } return err @@ -59,30 +69,37 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) - if _, ok := fs.filer.Store.ActualStore.(filer2.FilerLocalStore); ok { - // println("reading from persisted logs ...") - processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } + // println("reading from persisted logs ...") + processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) } + glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) // println("reading from in memory logs ...") - err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.listenersLock.Lock() - fs.listenersCond.Wait() - fs.listenersLock.Unlock() - return true - }, eachLogEntryFn) + for { + lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) + if err != nil { + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } return err @@ -104,9 +121,22 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + foundSelf := false + for _, sig := range eventNotification.Signatures { + if sig == clientSignature && clientSignature != 0 { + return nil + } + if sig == fs.filer.Signature { + foundSelf = true + } + } + if !foundSelf { + eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature) + } + // get complete path to the file or directory var entryName string if eventNotification.OldEntry != nil { @@ -118,7 +148,7 @@ func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream file fullpath := util.Join(dirPath, entryName) // skip on filer internal meta logs - if strings.HasPrefix(fullpath, filer2.SystemLogDir) { + if strings.HasPrefix(fullpath, filer.SystemLogDir) { return nil } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 6995c7cfe..59c149cef 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" "net/http" "os" "sync" @@ -15,19 +16,19 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/filer2" - _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" - _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" - _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" - _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" - _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" - _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" - _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2" + "github.com/chrislusf/seaweedfs/weed/filer" + _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" + _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" @@ -58,9 +59,13 @@ type FilerOption struct { type FilerServer struct { option *FilerOption secret security.SigningKey - filer *filer2.Filer + filer *filer.Filer grpcDialOption grpc.DialOption + // metrics read from the master + metricsAddress string + metricsIntervalSec int + // notifying clients listenersLock sync.Mutex listenersCond *sync.Cond @@ -82,13 +87,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { + fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher - maybeStartMetrics(fs, option) + fs.checkWithMaster() + go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepConnectedToMaster() v := util.GetViper() @@ -130,9 +136,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) return fs, nil } -func maybeStartMetrics(fs *FilerServer, option *FilerOption) { +func (fs *FilerServer) checkWithMaster() { - for _, master := range option.Masters { + for _, master := range fs.option.Masters { _, err := pb.ParseFilerGrpcAddress(master) if err != nil { glog.Fatalf("invalid master address %s: %v", master, err) @@ -140,12 +146,19 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { } isConnected := false - var metricsAddress string - var metricsIntervalSec int - var readErr error for !isConnected { - for _, master := range option.Masters { - metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master) + for _, master := range fs.option.Masters { + readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", master, err) + } + fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + if fs.option.DefaultReplication == "" { + fs.option.DefaultReplication = resp.DefaultReplication + } + return nil + }) if readErr == nil { isConnected = true } else { @@ -153,23 +166,5 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { } } } - if metricsAddress == "" && metricsIntervalSec <= 0 { - return - } - go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, - func() (addr string, intervalSeconds int) { - return metricsAddress, metricsIntervalSec - }) -} -func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { - err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get master %s configuration: %v", masterAddress, err) - } - metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) - return nil - }) - return } diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index b6bfc3b04..18f78881c 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/util" "net/http" "time" @@ -8,6 +9,7 @@ import ( ) func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) start := time.Now() switch r.Method { case "GET": @@ -34,6 +36,7 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { } func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) start := time.Now() switch r.Method { case "GET": diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 657158c2f..fbd45d6b9 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -11,7 +11,7 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -94,7 +94,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } // set etag - etag := filer2.ETagEntry(entry) + etag := filer.ETagEntry(entry) if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" { w.WriteHeader(http.StatusNotModified) return @@ -105,17 +105,17 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, adjustHeaderContentDisposition(w, r, filename) if r.Method == "HEAD" { - w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10)) + w.Header().Set("Content-Length", strconv.FormatInt(int64(entry.Size()), 10)) return } - totalSize := int64(filer2.TotalSize(entry.Chunks)) + totalSize := int64(entry.Size()) if rangeReq := r.Header.Get("Range"); rangeReq == "" { ext := filepath.Ext(filename) width, height, mode, shouldResize := shouldResizeImages(ext, r) if shouldResize { - data, err := filer2.ReadAll(fs.filer.MasterClient, entry.Chunks) + data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks) if err != nil { glog.Errorf("failed to read %s: %v", path, err) w.WriteHeader(http.StatusNotModified) @@ -128,7 +128,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { - return filer2.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) + return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) }) } diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index ae28fc1db..9ca0209f4 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -32,7 +32,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName := r.FormValue("lastFileName") - entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit) + entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, "") if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index da66178ce..0091ae3ce 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,22 +2,11 @@ package weed_server import ( "context" - "crypto/md5" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "mime" "net/http" - "net/url" "os" - filenamePath "path" - "strconv" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -98,206 +87,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ttlSeconds = int32(ttl.Minutes()) * 60 } - if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked { - return - } + fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) - if fs.option.Cipher { - reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - } else if reply != nil { - writeJsonQuiet(w, r, http.StatusCreated, reply) - } - - return - } - - fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) - - if err != nil || fileId == "" || urlLocation == "" { - glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) - writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)) - return - } - - glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) - - u, _ := url.Parse(urlLocation) - ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) - if err != nil { - return - } - - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil { - return - } - - // send back post result - reply := FilerPostResult{ - Name: ret.Name, - Size: int64(ret.Size), - Error: ret.Error, - Fid: fileId, - Url: urlLocation, - } - setEtag(w, ret.ETag) - writeJsonQuiet(w, r, http.StatusCreated, reply) -} - -// update metadata in filer store -func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string, - collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) { - - stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) - }() - - modeStr := r.URL.Query().Get("mode") - if modeStr == "" { - modeStr = "0660" - } - mode, err := strconv.ParseUint(modeStr, 8, 32) - if err != nil { - glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) - mode = 0660 - } - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } - } - existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) - crTime := time.Now() - if err == nil && existingEntry != nil { - crTime = existingEntry.Crtime - } - entry := &filer2.Entry{ - FullPath: util.FullPath(path), - Attr: filer2.Attr{ - Mtime: time.Now(), - Crtime: crTime, - Mode: os.FileMode(mode), - Uid: OS_UID, - Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: ttlSeconds, - Mime: ret.Mime, - Md5: md5value, - }, - Chunks: []*filer_pb.FileChunk{{ - FileId: fileId, - Size: uint64(ret.Size), - Mtime: time.Now().UnixNano(), - ETag: ret.ETag, - }}, - } - if entry.Attr.Mime == "" { - if ext := filenamePath.Ext(path); ext != "" { - entry.Attr.Mime = mime.TypeByExtension(ext) - } - } - // glog.V(4).Infof("saving %s => %+v", path, entry) - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { - fs.filer.DeleteChunks(entry.Chunks) - glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - writeJsonError(w, r, http.StatusInternalServerError, dbErr) - err = dbErr - return - } - - return nil -} - -// send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) { - - stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() - start := time.Now() - defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() - - ret = &operation.UploadResult{} - - md5Hash := md5.New() - body := r.Body - if r.Method == "PUT" { - // only PUT or large chunked files has Md5 in attributes - body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash)) - } - - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: body, - Host: r.Host, - ContentLength: r.ContentLength, - } - - if auth != "" { - request.Header.Set("Authorization", "BEARER "+string(auth)) - } - resp, doErr := util.Do(request) - if doErr != nil { - glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) - writeJsonError(w, r, http.StatusInternalServerError, doErr) - err = doErr - return - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - }() - - respBody, raErr := ioutil.ReadAll(resp.Body) - if raErr != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) - writeJsonError(w, r, http.StatusInternalServerError, raErr) - err = raErr - return - } - - glog.V(4).Infoln("post result", string(respBody)) - unmarshalErr := json.Unmarshal(respBody, &ret) - if unmarshalErr != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) - err = unmarshalErr - return - } - if ret.Error != "" { - err = errors.New(ret.Error) - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - // find correct final path - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } else { - err = fmt.Errorf("can not to write to folder %s without a file name", path) - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - } - // use filer calculated md5 ETag, instead of the volume server crc ETag - if r.Method == "PUT" { - md5value = md5Hash.Sum(nil) - } - ret.ETag = getEtag(resp) - return } // curl -X DELETE http://localhost:8888/path/to @@ -320,7 +111,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { objectPath = objectPath[0 : len(objectPath)-1] } - err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil) if err != nil { glog.V(1).Infoln("deleting", objectPath, ":", err.Error()) httpStatus := http.StatusInternalServerError diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index be0438efb..2b37e3c5d 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -3,15 +3,18 @@ package weed_server import ( "context" "crypto/md5" + "fmt" + "hash" "io" "io/ioutil" "net/http" + "os" "path" "strconv" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -21,11 +24,7 @@ import ( ) func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool { - if r.Method != "POST" { - glog.V(4).Infoln("AutoChunking not supported for method", r.Method) - return false - } + replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -35,54 +34,47 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if maxMB <= 0 && fs.option.MaxMB > 0 { maxMB = int32(fs.option.MaxMB) } - if maxMB <= 0 { - glog.V(4).Infoln("AutoChunking not enabled") - return false - } - glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") chunkSize := 1024 * 1024 * maxMB - contentLength := int64(0) - if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { - contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) - if contentLength <= int64(chunkSize) { - glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") - return false - } - } + stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() + start := time.Now() + defer func() { + stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) + }() - if contentLength <= 0 { - glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") - return false + var reply *FilerPostResult + var err error + var md5bytes []byte + if r.Method == "POST" { + if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") { + reply, err = fs.mkdir(ctx, w, r) + } else { + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) + } + } else { + reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) } - - reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { + if len(md5bytes) > 0 { + w.Header().Set("Content-MD5", util.Base64Encode(md5bytes)) + } writeJsonQuiet(w, r, http.StatusCreated, reply) } - return true } -func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { - - stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) - }() +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { - return nil, multipartReaderErr + return nil, nil, multipartReaderErr } part1, part1Err := multipartReader.NextPart() if part1Err != nil { - return nil, part1Err + return nil, nil, part1Err } fileName := part1.FileName() @@ -90,54 +82,63 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r fileName = path.Base(fileName) } contentType := part1.Header.Get("Content-Type") + if contentType == "application/octet-stream" { + contentType = "" + } - var fileChunks []*filer_pb.FileChunk - - md5Hash := md5.New() - var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash)) - - chunkOffset := int64(0) - - for chunkOffset < contentLength { - limitedReader := io.LimitReader(partReader, int64(chunkSize)) - - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) - if assignErr != nil { - return nil, assignErr - } + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + if err != nil { + return nil, nil, err + } - // upload the chunk to the volume server - uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) - if uploadErr != nil { - return nil, uploadErr - } + fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) + if replyerr != nil { + glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) + return + } - // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { - break - } + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + return +} - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength) +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { - // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) + fileName := "" + contentType := "" - // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { - break - } + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync) + if err != nil { + return nil, nil, err } - fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) + fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks) if replyerr != nil { glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) return } + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) + + return +} + +func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { + + // detect file mode + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 + } + + // fix the path path := r.URL.Path if strings.HasSuffix(path, "/") { if fileName != "" { @@ -145,20 +146,28 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r } } + // fix the crTime + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + crTime := time.Now() + if err == nil && existingEntry != nil { + crTime = existingEntry.Crtime + } + glog.V(4).Infoln("saving", path) - entry := &filer2.Entry{ + entry := &filer.Entry{ FullPath: util.FullPath(path), - Attr: filer2.Attr{ + Attr: filer.Attr{ Mtime: time.Now(), - Crtime: time.Now(), - Mode: 0660, + Crtime: crTime, + Mode: os.FileMode(mode), Uid: OS_UID, Gid: OS_GID, Replication: replication, Collection: collection, TtlSec: ttlSec, Mime: contentType, - Md5: md5Hash.Sum(nil), + Md5: md5bytes, + FileSize: uint64(chunkOffset), }, Chunks: fileChunks, } @@ -168,15 +177,57 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r Size: chunkOffset, } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - return } + return filerResult, replyerr +} - return +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { + var fileChunks []*filer_pb.FileChunk + + md5Hash := md5.New() + var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) + + chunkOffset := int64(0) + + for { + limitedReader := io.LimitReader(partReader, int64(chunkSize)) + + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync) + if assignErr != nil { + return nil, nil, 0, assignErr + } + + // upload the chunk to the volume server + uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) + if uploadErr != nil { + return nil, nil, 0, uploadErr + } + + // if last chunk exhausted the reader exactly at the border + if uploadResult.Size == 0 { + break + } + + // Save to chunk manifest structure + fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) + + // reset variables for the next chunk + chunkOffset = chunkOffset + int64(uploadResult.Size) + + // if last chunk was not at full chunk size, but already exhausted the reader + if int64(uploadResult.Size) < int64(chunkSize) { + break + } + } + return fileChunks, md5Hash, chunkOffset, nil } func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) { @@ -191,7 +242,7 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht return uploadResult, err } -func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, ttlString string, fsync bool) filer2.SaveDataAsChunkFunctionType { +func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, ttlString string, fsync bool) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { // assign one file id for one chunk @@ -210,3 +261,51 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe } } +func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http.Request) (filerResult *FilerPostResult, replyerr error) { + + // detect file mode + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 + } + + // fix the path + path := r.URL.Path + if strings.HasSuffix(path, "/") { + path = path[:len(path)-1] + } + + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + if err == nil && existingEntry != nil { + replyerr = fmt.Errorf("dir %s already exists", path) + return + } + + glog.V(4).Infoln("mkdir", path) + entry := &filer.Entry{ + FullPath: util.FullPath(path), + Attr: filer.Attr{ + Mtime: time.Now(), + Crtime: time.Now(), + Mode: os.FileMode(mode) | os.ModeDir, + Uid: OS_UID, + Gid: OS_GID, + }, + } + + filerResult = &FilerPostResult{ + Name: util.FullPath(path).Name(), + } + + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { + replyerr = dbErr + filerResult.Error = dbErr.Error() + glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr) + } + return filerResult, replyerr +} diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 8413496b8..60082a8d4 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -58,9 +58,9 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht } } - entry := &filer2.Entry{ + entry := &filer.Entry{ FullPath: util.FullPath(path), - Attr: filer2.Attr{ + Attr: filer.Attr{ Mtime: time.Now(), Crtime: time.Now(), Mode: 0660, @@ -70,6 +70,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Collection: collection, TtlSec: ttlSeconds, Mime: pu.MimeType, + Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, Chunks: fileChunks, } @@ -79,7 +80,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Size: int64(pu.OriginalDataSize), } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) err = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go index e532b27e2..04a81433b 100644 --- a/weed/server/filer_ui/templates.go +++ b/weed/server/filer_ui/templates.go @@ -3,10 +3,19 @@ package master_ui import ( "github.com/dustin/go-humanize" "html/template" + "net/url" + "strings" ) +func printpath(parts ...string) string { + concat := strings.Join(parts, "") + escaped := url.PathEscape(concat) + return strings.ReplaceAll(escaped, "%2F", "/") +} + var funcMap = template.FuncMap{ "humanizeBytes": humanize.Bytes, + "printpath": printpath, } var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html> @@ -50,7 +59,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <div class="row"> <div> {{ range $entry := .Breadcrumbs }} - <a href="{{ $entry.Link }}" > + <a href="{{ printpath $entry.Link }}" > {{ $entry.Name }} </a> {{ end }} @@ -69,11 +78,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <td> {{if $entry.IsDirectory}} <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23"> - <a href={{ print $path "/" $entry.Name "/"}} > + <a href="{{ printpath $path "/" $entry.Name "/"}}" > {{ $entry.Name }} </a> {{else}} - <a href={{ print $path "/" $entry.Name }} > + <a href="{{ printpath $path "/" $entry.Name }}" > {{ $entry.Name }} </a> {{end}} diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 1ee214deb..692909a29 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -12,21 +12,19 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" ) func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error { var dn *topology.DataNode - t := ms.Topo defer func() { if dn != nil { // if the volume server disconnects and reconnects quickly // the unregister and register can race with each other - t.UnRegisterDataNode(dn) + ms.Topo.UnRegisterDataNode(dn) glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) message := &master_pb.VolumeLocation{ @@ -62,21 +60,18 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } - t.Sequence.SetMax(heartbeat.MaxFileKey) + ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey) if dn == nil { - dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) - dc := t.GetOrCreateDataCenter(dcName) + dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int64(heartbeat.MaxVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ - VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, - MetricsAddress: ms.option.MetricsAddress, - MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), - StorageBackends: backend.ToPbStorageBackends(), + VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, }); err != nil { glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err) return err @@ -102,12 +97,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ message.DeletedVids = append(message.DeletedVids, volInfo.Id) } // update master internal volume layouts - t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) + ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) } if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { // process heartbeat.Volumes - newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn) for _, v := range newVolumes { glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url()) @@ -122,7 +117,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 { // update master internal volume layouts - t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) + ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) for _, s := range heartbeat.NewEcShards { message.NewVids = append(message.NewVids, s.Id) @@ -137,8 +132,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards { - glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) - newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn) + glog.V(1).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) + newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn) // broadcast the ec vid changes to master clients for _, s := range newShards { @@ -163,7 +158,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } // tell the volume servers about the leader - newLeader, err := t.Leader() + newLeader, err := ms.Topo.Leader() if err != nil { glog.Warningf("SendHeartbeat find leader: %v", err) return err @@ -192,7 +187,8 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ peerAddress := findClientAddress(stream.Context(), req.GrpcPort) - stopChan := make(chan bool) + // buffer by 1 so we don't end up getting stuck writing to stopChan forever + stopChan := make(chan bool, 1) clientName, messageChan := ms.addClient(req.Name, peerAddress) @@ -252,7 +248,12 @@ func (ms *MasterServer) addClient(clientType string, clientAddress string) (clie clientName = clientType + "@" + clientAddress glog.V(0).Infof("+ client %v", clientName) - messageChan = make(chan *master_pb.VolumeLocation) + // we buffer this because otherwise we end up in a potential deadlock where + // the KeepConnected loop is no longer listening on this channel but we're + // trying to send to it in SendHeartbeat and so we can't lock the + // clientChansLock to remove the channel and we're stuck writing to it + // 100 is probably overkill + messageChan = make(chan *master_pb.VolumeLocation, 100) ms.clientChansLock.Lock() ms.clientChans[clientName] = messageChan diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 282c75679..6a320dfb1 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/raft" @@ -184,6 +185,8 @@ func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_ resp := &master_pb.GetMasterConfigurationResponse{ MetricsAddress: ms.option.MetricsAddress, MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + StorageBackends: backend.ToPbStorageBackends(), + DefaultReplication: ms.option.DefaultReplicaPlacement, } return resp, nil diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 377fac26f..657b170c2 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -7,7 +7,6 @@ import ( "net/url" "os" "regexp" - "strconv" "strings" "sync" "time" @@ -210,7 +209,7 @@ func (ms *MasterServer) startAdminScripts() { scriptLines = append(scriptLines, "unlock") } - masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) + masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port) var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 7595c0171..34235384f 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -110,7 +110,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) } else { url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path } - http.Redirect(w, r, url, http.StatusMovedPermanently) + http.Redirect(w, r, url, http.StatusPermanentRedirect) } else { writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error)) } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index eaf5aaf6e..9296c63e9 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -149,7 +149,35 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv } return resp, err +} + +func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) { + + resp := &volume_server_pb.VolumeMarkWritableResponse{} + + err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId)) + + if err != nil { + glog.Errorf("volume mark writable %v: %v", req, err) + } else { + glog.V(2).Infof("volume mark writable %v", req) + } + + return resp, err +} + +func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) { + resp := &volume_server_pb.VolumeStatusResponse{} + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + resp.IsReadOnly = v.IsReadOnly() + + return resp, nil } func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) { @@ -168,6 +196,16 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv } +func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) { + + resp := &volume_server_pb.VolumeServerLeaveResponse{} + + vs.StopHeartbeat() + + return resp, nil + +} + func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) { resp := &volume_server_pb.VolumeNeedleStatusResponse{} @@ -188,7 +226,7 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv } count, err = vs.store.ReadEcShardNeedle(volumeId, n) } else { - count, err = vs.store.ReadVolumeNeedle(volumeId, n) + count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil) } if err != nil { return nil, err @@ -199,7 +237,7 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv resp.NeedleId = uint64(n.Id) resp.Cookie = uint32(n.Cookie) - resp.Size = n.Size + resp.Size = uint32(n.Size) resp.LastModified = n.LastModified resp.Crc = n.Checksum.Value() if n.HasTtl() { diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index 501964191..8e84dc2a8 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -41,7 +41,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B } else { n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, Status: http.StatusNotFound, @@ -79,7 +79,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, Status: http.StatusAccepted, - Size: size}, + Size: uint32(size)}, ) } } diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 7cb836344..8698a4c64 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,7 +2,7 @@ package weed_server import ( "fmt" - "net" + "github.com/chrislusf/seaweedfs/weed/operation" "time" "google.golang.org/grpc" @@ -22,6 +22,31 @@ import ( func (vs *VolumeServer) GetMaster() string { return vs.currentMaster } + +func (vs *VolumeServer) checkWithMaster() (err error) { + isConnected := false + for !isConnected { + for _, master := range vs.SeedMasterNodes { + err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", master, err) + } + vs.metricsAddress, vs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + backend.LoadFromPbStorageBackends(resp.StorageBackends) + return nil + }) + if err == nil { + return + } else { + glog.V(0).Infof("checkWithMaster %s: %v", master, err) + } + } + time.Sleep(1790 * time.Millisecond) + } + return +} + func (vs *VolumeServer) heartbeat() { glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes) @@ -32,7 +57,7 @@ func (vs *VolumeServer) heartbeat() { var err error var newLeader string - for { + for vs.isHeartbeating { for _, master := range vs.SeedMasterNodes { if newLeader != "" { // the new leader may actually is the same master @@ -53,20 +78,35 @@ func (vs *VolumeServer) heartbeat() { newLeader = "" vs.store.MasterAddress = "" } + if !vs.isHeartbeating { + break + } } } } +func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) { + if !vs.isHeartbeating { + return true + } + vs.isHeartbeating = false + vs.stopChan <- true + return false +} + func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) - stream, err := client.SendHeartbeat(context.Background()) + stream, err := client.SendHeartbeat(ctx) if err != nil { glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) return "", err @@ -87,23 +127,16 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) if vs.store.MaybeAdjustVolumeMax() { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err) } } } - if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) { - glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) + if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() { + glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster) newLeader = in.GetLeader() doneChan <- nil return } - if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() { - vs.MetricsAddress = in.GetMetricsAddress() - vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds()) - } - if len(in.StorageBackends) > 0 { - backend.LoadFromPbStorageBackends(in.StorageBackends) - } } }() @@ -182,19 +215,8 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } case err = <-doneChan: return + case <-vs.stopChan: + return } } } - -func isSameIP(ip string, host string) bool { - ips, err := net.LookupIP(host) - if err != nil { - return false - } - for _, t := range ips { - if ip == t.String() { - return true - } - } - return false -} diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 5c7d5572c..cd2b53c8a 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -27,17 +27,12 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId) - err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)) - if err != nil { - return nil, fmt.Errorf("failed to mount existing volume %d: %v", req.VolumeId, err) - } - - err = vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) + err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) if err != nil { return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err) } - glog.V(0).Infof("deleted exisitng volume %d before copying.", req.VolumeId) + glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId) } location := vs.store.FindFreeLocation() diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 79348c9d7..55e0261c8 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -272,7 +272,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea if req.FileKey != 0 { _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey)) - if size == types.TombstoneFileSize { + if size.IsDeleted() { return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{ IsDeleted: true, }) @@ -340,7 +340,7 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv if err != nil { return nil, fmt.Errorf("locate in local ec volume: %v", err) } - if size == types.TombstoneFileSize { + if size.IsDeleted() { return resp, nil } diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go index 767e28e7b..2f4fab96a 100644 --- a/weed/server/volume_grpc_query.go +++ b/weed/server/volume_grpc_query.go @@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_ n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err) return err } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 3af37b491..83df32fdd 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -28,9 +28,11 @@ type VolumeServer struct { FixJpgOrientation bool ReadRedirect bool compactionBytePerSecond int64 - MetricsAddress string - MetricsIntervalSec int + metricsAddress string + metricsIntervalSec int fileSizeLimitBytes int64 + isHeartbeating bool + stopChan chan bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -66,16 +68,21 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, + isHeartbeating: true, + stopChan: make(chan bool), } vs.SeedMasterNodes = masterNodes + + vs.checkWithMaster() + vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) + adminMux.HandleFunc("/status", vs.statusHandler) if signingKey == "" || enableUiAccess { // only expose the volume server details for safe environments adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) - adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) /* adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) @@ -90,11 +97,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, } go vs.heartbeat() - hostAddress := fmt.Sprintf("%s:%d", ip, port) - go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather, - func() (addr string, intervalSeconds int) { - return vs.MetricsAddress, vs.MetricsIntervalSec - }) + go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec) return vs } diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 14ad27d42..ad13cdf3b 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/util" "net/http" "strings" @@ -25,6 +26,7 @@ security settings: */ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) switch r.Method { case "GET", "HEAD": stats.ReadRequest() @@ -39,6 +41,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque } func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) switch r.Method { case "GET": stats.ReadRequest() diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 34655d833..4d84c9c4d 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -10,6 +10,7 @@ import ( ) func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) m := make(map[string]interface{}) m["Version"] = util.Version() var ds []*volume_server_pb.DiskStatus @@ -24,6 +25,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) m := make(map[string]interface{}) m["Version"] = util.Version() var ds []*volume_server_pb.DiskStatus diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index d730600e4..bb04678d6 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -18,6 +18,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -81,15 +82,20 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } cookie := n.Cookie + + readOption := &storage.ReadOption{ + ReadDeleted: r.FormValue("readDeleted") == "true", + } + var count int if hasVolume { - count, err = vs.store.ReadVolumeNeedle(volumeId, n) + count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption) } else if hasEcVolume { count, err = vs.store.ReadEcShardNeedle(volumeId, n) } // glog.V(4).Infoln("read bytes", count, "error", err) if err != nil || count < 0 { - glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err) + glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err) w.WriteHeader(http.StatusNotFound) return } diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index 8b2027e7b..e535327e2 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -13,6 +13,7 @@ import ( ) func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) infos := make(map[string]interface{}) infos["Up Time"] = time.Now().Sub(startTime).String() var ds []*volume_server_pb.DiskStatus diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 74dad28de..78cbf08c5 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -42,7 +42,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) + reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return @@ -70,6 +70,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret.ETag = reqNeedle.Etag() ret.Mime = string(reqNeedle.Mime) setEtag(w, ret.ETag) + w.Header().Set("Content-MD5", contentMd5) writeJsonQuiet(w, r, httpStatus, ret) } @@ -103,7 +104,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } - _, ok := vs.store.ReadVolumeNeedle(volumeId, n) + _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil) if ok != nil { m := make(map[string]uint32) m["size"] = 0 diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 8655daf70..f13e73a7b 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" "golang.org/x/net/webdav" "google.golang.org/grpc" @@ -20,7 +19,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" ) @@ -42,7 +41,7 @@ type WebDavOption struct { type WebDavServer struct { option *WebDavOption secret security.SigningKey - filer *filer2.Filer + filer *filer.Filer grpcDialOption grpc.DialOption Handler *webdav.Handler } @@ -68,9 +67,10 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { type WebDavFileSystem struct { option *WebDavOption secret security.SigningKey - filer *filer2.Filer + filer *filer.Filer grpcDialOption grpc.DialOption - chunkCache *chunk_cache.ChunkCache + chunkCache *chunk_cache.TieredChunkCache + signature int32 } type FileInfo struct { @@ -94,19 +94,17 @@ type WebDavFile struct { isDirectory bool off int64 entry *filer_pb.Entry - entryViewCache []filer2.VisibleInterval + entryViewCache []filer.VisibleInterval reader io.ReaderAt } func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { - chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB) - grace.OnInterrupt(func() { - chunkCache.Shutdown() - }) + chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024) return &WebDavFileSystem{ option: option, chunkCache: chunkCache, + signature: util.RandomInt32(), }, nil } @@ -169,6 +167,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm Gid: fs.option.Gid, }, }, + Signatures: []int32{fs.signature}, } glog.V(1).Infof("mkdir: %v", request) @@ -220,6 +219,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f TtlSec: 0, }, }, + Signatures: []int32{fs.signature}, }); err != nil { return fmt.Errorf("create %s: %v", fullFilePath, err) } @@ -259,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) dir, name := util.FullPath(fullFilePath).DirAndName() - return filer_pb.Remove(fs, dir, name, true, false, false, false) + return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature}) } @@ -338,7 +338,7 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F if err != nil { return nil, err } - fi.size = int64(filer2.TotalSize(entry.GetChunks())) + fi.size = int64(filer.FileSize(entry)) fi.name = string(fullpath) fi.mode = os.FileMode(entry.Attributes.FileMode) fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0) @@ -426,8 +426,9 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Attributes.Replication = replication request := &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: f.entry, + Directory: dir, + Entry: f.entry, + Signatures: []int32{f.fs.signature}, } if _, err := client.UpdateEntry(ctx, request); err != nil { @@ -470,16 +471,17 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { if err != nil { return 0, err } - if len(f.entry.Chunks) == 0 { + fileSize := int64(filer.FileSize(f.entry)) + if fileSize == 0 { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(f.fs), f.entry.Chunks) + f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks) f.reader = nil } if f.reader == nil { - chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32) - f.reader = filer2.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache) + chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64) + f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize) } readSize, err = f.reader.ReadAt(p, f.off) @@ -507,7 +509,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error { fi := FileInfo{ - size: int64(filer2.TotalSize(entry.GetChunks())), + size: int64(filer.FileSize(entry)), name: entry.Name, mode: os.FileMode(entry.Attributes.FileMode), modifiledTime: time.Unix(entry.Attributes.Mtime, 0), @@ -550,9 +552,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) { var err error switch whence { - case 0: + case io.SeekStart: f.off = 0 - case 2: + case io.SeekEnd: if fi, err := f.fs.stat(ctx, f.name); err != nil { return 0, err } else { |
