diff options
Diffstat (limited to 'weed/server')
35 files changed, 1111 insertions, 890 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index bc6008864..44098a4b5 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -218,7 +218,7 @@ func handleStaticResources2(r *mux.Router) { r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(statikFS))) } -func adjustHeadersAfterHEAD(w http.ResponseWriter, r *http.Request, filename string) { +func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) { if filename != "" { contentDisposition := "inline" if r.FormValue("dl") != "" { diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 901f798f0..8f326f5c7 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -6,14 +6,14 @@ import ( "os" "path/filepath" "strconv" - "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -32,11 +32,13 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L return &filer_pb.LookupDirectoryEntryResponse{ Entry: &filer_pb.Entry{ - Name: req.Name, - IsDirectory: entry.IsDirectory(), - Attributes: filer2.EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, + Name: req.Name, + IsDirectory: entry.IsDirectory(), + Attributes: filer.EntryAttributeToPb(entry), + Chunks: entry.Chunks, + Extended: entry.Extended, + HardLinkId: entry.HardLinkId, + HardLinkCounter: entry.HardLinkCounter, }, }, nil } @@ -50,7 +52,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file limit = fs.option.DirListingLimit } - paginationLimit := filer2.PaginationSize + paginationLimit := filer.PaginationSize if limit < paginationLimit { paginationLimit = limit } @@ -58,7 +60,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) + entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix) if err != nil { return err @@ -73,19 +75,15 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName = entry.Name() - if req.Prefix != "" { - if !strings.HasPrefix(entry.Name(), req.Prefix) { - continue - } - } - if err := stream.Send(&filer_pb.ListEntriesResponse{ Entry: &filer_pb.Entry{ - Name: entry.Name(), - IsDirectory: entry.IsDirectory(), - Chunks: entry.Chunks, - Attributes: filer2.EntryAttributeToPb(entry), - Extended: entry.Extended, + Name: entry.Name(), + IsDirectory: entry.IsDirectory(), + Chunks: entry.Chunks, + Attributes: filer.EntryAttributeToPb(entry), + Extended: entry.Extended, + HardLinkId: entry.HardLinkId, + HardLinkCounter: entry.HardLinkCounter, }, }); err != nil { return err @@ -137,28 +135,43 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol return resp, nil } +func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) { + fid, err := needle.ParseFileIdFromString(fileId) + if err != nil { + return nil, err + } + locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId)) + if !found || len(locations) == 0 { + return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId) + } + for _, loc := range locations { + targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId)) + } + return +} + func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { - glog.V(4).Infof("CreateEntry %v", req) + glog.V(4).Infof("CreateEntry %v/%v", req.Directory, req.Entry.Name) resp = &filer_pb.CreateEntryResponse{} - chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) - - if req.Entry.Attributes == nil { - glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name)) - resp.Error = fmt.Sprintf("can not create entry with empty attributes") - return + chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry) + if err2 != nil { + return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) } - createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{ - FullPath: util.JoinPath(req.Directory, req.Entry.Name), - Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), - Chunks: chunks, - }, req.OExcl) + createErr := fs.filer.CreateEntry(ctx, &filer.Entry{ + FullPath: util.JoinPath(req.Directory, req.Entry.Name), + Attr: filer.PbToEntryAttribute(req.Entry.Attributes), + Chunks: chunks, + Extended: req.Entry.Extended, + HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), + HardLinkCounter: req.Entry.HardLinkCounter, + }, req.OExcl, req.IsFromOtherCluster, req.Signatures) if createErr == nil { - fs.filer.DeleteChunks(garbages) + fs.filer.DeleteChunks(garbage) } else { glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr) resp.Error = createErr.Error() @@ -177,16 +190,18 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } - // remove old chunks if not included in the new ones - unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks) - - chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) + chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry) + if err2 != nil { + return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) + } - newEntry := &filer2.Entry{ - FullPath: util.JoinPath(req.Directory, req.Entry.Name), - Attr: entry.Attr, - Extended: req.Entry.Extended, - Chunks: chunks, + newEntry := &filer.Entry{ + FullPath: util.JoinPath(req.Directory, req.Entry.Name), + Attr: entry.Attr, + Extended: req.Entry.Extended, + Chunks: chunks, + HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), + HardLinkCounter: req.Entry.HardLinkCounter, } glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v", @@ -209,22 +224,51 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr } - if filer2.EqualEntry(entry, newEntry) { + if filer.EqualEntry(entry, newEntry) { return &filer_pb.UpdateEntryResponse{}, err } if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil { - fs.filer.DeleteChunks(unusedChunks) - fs.filer.DeleteChunks(garbages) + fs.filer.DeleteChunks(garbage) + + fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures) + } else { glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err) } - fs.filer.NotifyUpdateEvent(entry, newEntry, true) - return &filer_pb.UpdateEntryResponse{}, err } +func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) { + + // remove old chunks if not included in the new ones + if existingEntry != nil { + garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks) + if err != nil { + return newEntry.Chunks, nil, fmt.Errorf("MinusChunks: %v", err) + } + } + + // files with manifest chunks are usually large and append only, skip calculating covered chunks + manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.Chunks) + + chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks) + garbage = append(garbage, coveredChunks...) + + if newEntry.Attributes != nil { + chunks, err = filer.MaybeManifestize(fs.saveAsChunk(newEntry.Attributes.Replication, newEntry.Attributes.Collection, "", "", needle.SecondsToTTL(newEntry.Attributes.TtlSec), false), chunks) + if err != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", err) + } + } + + chunks = append(chunks, manifestChunks...) + + return +} + func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) { glog.V(4).Infof("AppendToEntry %v", req) @@ -233,9 +277,9 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo var offset int64 = 0 entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath)) if err == filer_pb.ErrNotFound { - entry = &filer2.Entry{ + entry = &filer.Entry{ FullPath: fullpath, - Attr: filer2.Attr{ + Attr: filer.Attr{ Crtime: time.Now(), Mtime: time.Now(), Mode: os.FileMode(0644), @@ -244,7 +288,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo }, } } else { - offset = int64(filer2.TotalSize(entry.Chunks)) + offset = int64(filer.TotalSize(entry.Chunks)) } for _, chunk := range req.Chunks { @@ -254,7 +298,13 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo entry.Chunks = append(entry.Chunks, req.Chunks...) - err = fs.filer.CreateEntry(context.Background(), entry, false) + entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(entry.Replication, entry.Collection, "", "", needle.SecondsToTTL(entry.TtlSec), false), entry.Chunks) + if err != nil { + // not good, but should be ok + glog.V(0).Infof("MaybeManifestize: %v", err) + } + + err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil) return &filer_pb.AppendToEntryResponse{}, err } @@ -263,7 +313,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr glog.V(4).Infof("DeleteEntry %v", req) - err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData) + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures) resp = &filer_pb.DeleteEntryResponse{} if err != nil { resp.Error = err.Error() @@ -277,7 +327,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol if req.TtlSec > 0 { ttlStr = strconv.Itoa(int(req.TtlSec)) } - collection, replication, _ := fs.detectCollection(req.ParentPath, req.Collection, req.Replication) + collection, replication, _ := fs.detectCollection(req.Path, req.Collection, req.Replication) var altRequest *operation.VolumeAssignRequest @@ -285,6 +335,10 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol if dataCenter == "" { dataCenter = fs.option.DataCenter } + rack := req.Rack + if rack == "" { + rack = fs.option.Rack + } assignRequest := &operation.VolumeAssignRequest{ Count: uint64(req.Count), @@ -292,14 +346,16 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol Collection: collection, Ttl: ttlStr, DataCenter: dataCenter, + Rack: rack, } - if dataCenter != "" { + if dataCenter != "" || rack != "" { altRequest = &operation.VolumeAssignRequest{ Count: uint64(req.Count), Replication: replication, Collection: collection, Ttl: ttlStr, DataCenter: "", + Rack: "", } } assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) @@ -323,6 +379,28 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol }, nil } +func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) { + + glog.V(4).Infof("CollectionList %v", req) + resp = &filer_pb.CollectionListResponse{} + + err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ + IncludeNormalVolumes: req.IncludeNormalVolumes, + IncludeEcVolumes: req.IncludeEcVolumes, + }) + if err != nil { + return err + } + for _, c := range masterResp.Collections { + resp.Collections = append(resp.Collections, &filer_pb.Collection{Name: c.Name}) + } + return nil + }) + + return +} + func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) { glog.V(4).Infof("DeleteCollection %v", req) @@ -369,12 +447,15 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { t := &filer_pb.GetFilerConfigurationResponse{ - Masters: fs.option.Masters, - Collection: fs.option.Collection, - Replication: fs.option.DefaultReplication, - MaxMb: uint32(fs.option.MaxMB), - DirBuckets: fs.filer.DirBucketsPath, - Cipher: fs.filer.Cipher, + Masters: fs.option.Masters, + Collection: fs.option.Collection, + Replication: fs.option.DefaultReplication, + MaxMb: uint32(fs.option.MaxMB), + DirBuckets: fs.filer.DirBucketsPath, + Cipher: fs.filer.Cipher, + Signature: fs.filer.Signature, + MetricsAddress: fs.metricsAddress, + MetricsIntervalSec: int32(fs.metricsIntervalSec), } glog.V(4).Infof("GetFilerConfiguration: %v", t) diff --git a/weed/server/filer_grpc_server_kv.go b/weed/server/filer_grpc_server_kv.go new file mode 100644 index 000000000..3cb47115e --- /dev/null +++ b/weed/server/filer_grpc_server_kv.go @@ -0,0 +1,42 @@ +package weed_server + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (fs *FilerServer) KvGet(ctx context.Context, req *filer_pb.KvGetRequest) (*filer_pb.KvGetResponse, error) { + + value, err := fs.filer.Store.KvGet(ctx, req.Key) + if err == filer.ErrKvNotFound { + return &filer_pb.KvGetResponse{}, nil + } + + if err != nil { + return &filer_pb.KvGetResponse{Error: err.Error()}, nil + } + + return &filer_pb.KvGetResponse{ + Value: value, + }, nil + +} + +// KvPut sets the key~value. if empty value, delete the kv entry +func (fs *FilerServer) KvPut(ctx context.Context, req *filer_pb.KvPutRequest) (*filer_pb.KvPutResponse, error) { + + if len(req.Value) == 0 { + if err := fs.filer.Store.KvDelete(ctx, req.Key); err != nil { + return &filer_pb.KvPutResponse{Error: err.Error()}, nil + } + } + + err := fs.filer.Store.KvPut(ctx, req.Key, req.Value) + if err != nil { + return &filer_pb.KvPutResponse{Error: err.Error()}, nil + } + + return &filer_pb.KvPutResponse{}, nil + +} diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go deleted file mode 100644 index 848a1fc3a..000000000 --- a/weed/server/filer_grpc_server_listen.go +++ /dev/null @@ -1,108 +0,0 @@ -package weed_server - -import ( - "fmt" - "strings" - "time" - - "github.com/golang/protobuf/proto" - - "github.com/chrislusf/seaweedfs/weed/filer2" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { - - peerAddress := findClientAddress(stream.Context(), 0) - - clientName := fs.addClient(req.ClientName, peerAddress) - - defer fs.deleteClient(clientName) - - lastReadTime := time.Unix(0, req.SinceNs) - glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - var processedTsNs int64 - - eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { - - // get complete path to the file or directory - var entryName string - if eventNotification.OldEntry != nil { - entryName = eventNotification.OldEntry.Name - } else if eventNotification.NewEntry != nil { - entryName = eventNotification.NewEntry.Name - } - - fullpath := util.Join(dirPath, entryName) - - // skip on filer internal meta logs - if strings.HasPrefix(fullpath, filer2.SystemLogDir) { - return nil - } - - if !strings.HasPrefix(fullpath, req.PathPrefix) { - return nil - } - - message := &filer_pb.SubscribeMetadataResponse{ - Directory: dirPath, - EventNotification: eventNotification, - TsNs: tsNs, - } - if err := stream.Send(message); err != nil { - glog.V(0).Infof("=> client %v: %+v", clientName, err) - return err - } - return nil - } - - eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { - event := &filer_pb.SubscribeMetadataResponse{} - if err := proto.Unmarshal(logEntry.Data, event); err != nil { - glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) - } - - if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil { - return err - } - - processedTsNs = logEntry.TsNs - - return nil - } - - if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) - } - - if processedTsNs != 0 { - lastReadTime = time.Unix(0, processedTsNs) - } - - err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { - fs.listenersLock.Lock() - fs.listenersCond.Wait() - fs.listenersLock.Unlock() - return true - }, eachLogEntryFn) - - return err - -} - -func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) { - clientName = clientType + "@" + clientAddress - glog.V(0).Infof("+ listener %v", clientName) - return -} - -func (fs *FilerServer) deleteClient(clientName string) { - glog.V(0).Infof("- listener %v", clientName) -} - -func (fs *FilerServer) notifyMetaListeners() { - fs.listenersCond.Broadcast() -} diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 7029c3342..f9ddeb600 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -5,7 +5,7 @@ import ( "fmt" "path/filepath" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" @@ -43,7 +43,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return &filer_pb.AtomicRenameEntryResponse{}, nil } -func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error { if entry.IsDirectory() { @@ -59,7 +59,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -70,7 +70,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. includeLastFile := false for { - entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024) + entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "") if err != nil { return err } @@ -92,7 +92,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents, +func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents, moveFolderSubEntries func() error) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -105,12 +105,13 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } // add to new directory - newEntry := &filer2.Entry{ + newEntry := &filer.Entry{ FullPath: newPath, Attr: entry.Attr, Chunks: entry.Chunks, + Extended: entry.Extended, } - createErr := fs.filer.CreateEntry(ctx, newEntry, false) + createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil) if createErr != nil { return createErr } @@ -124,7 +125,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } // delete old entry - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false) + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil) if deleteErr != nil { return deleteErr } @@ -136,6 +137,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } type MoveEvents struct { - oldEntries []*filer2.Entry - newEntries []*filer2.Entry + oldEntries []*filer.Entry + newEntries []*filer.Entry } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go new file mode 100644 index 000000000..634fb5211 --- /dev/null +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -0,0 +1,181 @@ +package weed_server + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" + "strings" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { + + peerAddress := findClientAddress(stream.Context(), 0) + + clientName := fs.addClient(req.ClientName, peerAddress) + + defer fs.deleteClient(clientName) + + lastReadTime := time.Unix(0, req.SinceNs) + glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) + + eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + + processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + + for { + lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.filer.MetaAggregator.ListenersLock.Lock() + fs.filer.MetaAggregator.ListenersCond.Wait() + fs.filer.MetaAggregator.ListenersLock.Unlock() + return true + }, eachLogEntryFn) + if err != nil { + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } + + return err + +} + +func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error { + + peerAddress := findClientAddress(stream.Context(), 0) + + clientName := fs.addClient(req.ClientName, peerAddress) + + defer fs.deleteClient(clientName) + + lastReadTime := time.Unix(0, req.SinceNs) + glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) + + eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) + + // println("reading from persisted logs ...") + processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + + // println("reading from in memory logs ...") + for { + lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) + if err != nil { + glog.Errorf("processed to %v: %v", lastReadTime, err) + time.Sleep(3127 * time.Millisecond) + if err != log_buffer.ResumeError { + break + } + } + } + + return err + +} + +func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error { + return func(logEntry *filer_pb.LogEntry) error { + event := &filer_pb.SubscribeMetadataResponse{} + if err := proto.Unmarshal(logEntry.Data, event); err != nil { + glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + } + + if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil { + return err + } + + return nil + } +} + +func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + + foundSelf := false + for _, sig := range eventNotification.Signatures { + if sig == clientSignature && clientSignature != 0 { + return nil + } + if sig == fs.filer.Signature { + foundSelf = true + } + } + if !foundSelf { + eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature) + } + + // get complete path to the file or directory + var entryName string + if eventNotification.OldEntry != nil { + entryName = eventNotification.OldEntry.Name + } else if eventNotification.NewEntry != nil { + entryName = eventNotification.NewEntry.Name + } + + fullpath := util.Join(dirPath, entryName) + + // skip on filer internal meta logs + if strings.HasPrefix(fullpath, filer.SystemLogDir) { + return nil + } + + if !strings.HasPrefix(fullpath, req.PathPrefix) { + return nil + } + + message := &filer_pb.SubscribeMetadataResponse{ + Directory: dirPath, + EventNotification: eventNotification, + TsNs: tsNs, + } + // println("sending", dirPath, entryName) + if err := stream.Send(message); err != nil { + glog.V(0).Infof("=> client %v: %+v", clientName, err) + return err + } + return nil + } +} + +func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) { + clientName = clientType + "@" + clientAddress + glog.V(0).Infof("+ listener %v", clientName) + return +} + +func (fs *FilerServer) deleteClient(clientName string) { + glog.V(0).Infof("- listener %v", clientName) +} diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 10b607dfe..065bb3251 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -8,6 +8,8 @@ import ( "sync" "time" + "github.com/chrislusf/seaweedfs/weed/stats" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/util/grace" @@ -15,19 +17,19 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" - "github.com/chrislusf/seaweedfs/weed/filer2" - _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra" - _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" - _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" - _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb" - _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" - _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" - _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" - _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2" + "github.com/chrislusf/seaweedfs/weed/filer" + _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" + _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" + _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb" + _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" + _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" + _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" @@ -46,20 +48,26 @@ type FilerOption struct { MaxMB int DirListingLimit int DataCenter string + Rack string DefaultLevelDbDir string DisableHttp bool Host string Port uint32 recursiveDelete bool Cipher bool + Filers []string } type FilerServer struct { option *FilerOption secret security.SigningKey - filer *filer2.Filer + filer *filer.Filer grpcDialOption grpc.DialOption + // metrics read from the master + metricsAddress string + metricsIntervalSec int + // notifying clients listenersLock sync.Mutex listenersCond *sync.Cond @@ -81,11 +89,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, fs.notifyMetaListeners) + fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() { + fs.listenersCond.Broadcast() + }) fs.filer.Cipher = option.Cipher - maybeStartMetrics(fs, option) + fs.checkWithMaster() + go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec) go fs.filer.KeepConnectedToMaster() v := util.GetViper() @@ -96,6 +107,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) if os.IsNotExist(err) { os.MkdirAll(option.DefaultLevelDbDir, 0755) } + glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir) } util.LoadConfiguration("notification", false) @@ -115,6 +127,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } + fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers) + fs.filer.LoadBuckets() grace.OnInterrupt(func() { @@ -124,9 +138,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) return fs, nil } -func maybeStartMetrics(fs *FilerServer, option *FilerOption) { +func (fs *FilerServer) checkWithMaster() { - for _, master := range option.Masters { + for _, master := range fs.option.Masters { _, err := pb.ParseFilerGrpcAddress(master) if err != nil { glog.Fatalf("invalid master address %s: %v", master, err) @@ -134,12 +148,19 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { } isConnected := false - var metricsAddress string - var metricsIntervalSec int - var readErr error for !isConnected { - for _, master := range option.Masters { - metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master) + for _, master := range fs.option.Masters { + readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", master, err) + } + fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + if fs.option.DefaultReplication == "" { + fs.option.DefaultReplication = resp.DefaultReplication + } + return nil + }) if readErr == nil { isConnected = true } else { @@ -147,23 +168,5 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { } } } - if metricsAddress == "" && metricsIntervalSec <= 0 { - return - } - go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather, - func() (addr string, intervalSeconds int) { - return metricsAddress, metricsIntervalSec - }) -} -func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { - err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { - resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) - if err != nil { - return fmt.Errorf("get master %s configuration: %v", masterAddress, err) - } - metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) - return nil - }) - return } diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index b6bfc3b04..18f78881c 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/util" "net/http" "time" @@ -8,6 +9,7 @@ import ( ) func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) start := time.Now() switch r.Method { case "GET": @@ -34,6 +36,7 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { } func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) start := time.Now() switch r.Method { case "GET": diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 76c924df1..731bd3545 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -11,7 +11,7 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -93,29 +93,42 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } } + //set tag count + if r.Method == "GET" { + tagCount := 0 + for k, _ := range entry.Extended { + if strings.HasPrefix(k, "x-amz-tagging-") { + tagCount++ + } + } + if tagCount > 0 { + w.Header().Set("x-amz-tag-count", strconv.Itoa(tagCount)) + } + } + // set etag - etag := filer2.ETagEntry(entry) + etag := filer.ETagEntry(entry) if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" { w.WriteHeader(http.StatusNotModified) return } setEtag(w, etag) + filename := entry.Name() + adjustHeaderContentDisposition(w, r, filename) + if r.Method == "HEAD" { - w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10)) + w.Header().Set("Content-Length", strconv.FormatInt(int64(entry.Size()), 10)) return } - filename := entry.Name() - adjustHeadersAfterHEAD(w, r, filename) - - totalSize := int64(filer2.TotalSize(entry.Chunks)) + totalSize := int64(entry.Size()) if rangeReq := r.Header.Get("Range"); rangeReq == "" { ext := filepath.Ext(filename) width, height, mode, shouldResize := shouldResizeImages(ext, r) if shouldResize { - data, err := filer2.ReadAll(fs.filer.MasterClient, entry.Chunks) + data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks) if err != nil { glog.Errorf("failed to read %s: %v", path, err) w.WriteHeader(http.StatusNotModified) @@ -128,7 +141,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { - return filer2.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) + return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) }) } diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index ae28fc1db..99345550c 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -2,6 +2,9 @@ package weed_server import ( "context" + "encoding/base64" + "fmt" + "github.com/skip2/go-qrcode" "net/http" "strconv" "strings" @@ -32,7 +35,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName := r.FormValue("lastFileName") - entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit) + entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, "") if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) @@ -65,21 +68,30 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName, shouldDisplayLoadMore, }) - } else { - ui.StatusTpl.Execute(w, struct { - Path string - Breadcrumbs []ui.Breadcrumb - Entries interface{} - Limit int - LastFileName string - ShouldDisplayLoadMore bool - }{ - path, - ui.ToBreadcrumb(path), - entries, - limit, - lastFileName, - shouldDisplayLoadMore, - }) + return + } + + var qrImageString string + img, err := qrcode.Encode(fmt.Sprintf("http://%s:%d%s", fs.option.Host, fs.option.Port, r.URL.Path), qrcode.Medium, 128) + if err == nil { + qrImageString = base64.StdEncoding.EncodeToString(img) } + + ui.StatusTpl.Execute(w, struct { + Path string + Breadcrumbs []ui.Breadcrumb + Entries interface{} + Limit int + LastFileName string + ShouldDisplayLoadMore bool + QrImage string + }{ + path, + ui.ToBreadcrumb(path), + entries, + limit, + lastFileName, + shouldDisplayLoadMore, + qrImageString, + }) } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 74a558e22..267b8752d 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,22 +2,11 @@ package weed_server import ( "context" - "crypto/md5" - "encoding/json" - "errors" - "fmt" - "io" - "io/ioutil" - "mime" "net/http" - "net/url" "os" - filenamePath "path" - "strconv" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -40,7 +29,7 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, rack, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() @@ -54,20 +43,20 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, DataCenter: dataCenter, } var altRequest *operation.VolumeAssignRequest - if dataCenter != "" { + if dataCenter != "" || rack != "" { altRequest = &operation.VolumeAssignRequest{ Count: 1, Replication: replication, Collection: collection, Ttl: ttlString, DataCenter: "", + Rack: "", } } assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest) if ae != nil { glog.Errorf("failing to assign a file id: %v", ae) - writeJsonError(w, r, http.StatusInternalServerError, ae) err = ae return } @@ -90,6 +79,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { if dataCenter == "" { dataCenter = fs.option.DataCenter } + rack := query.Get("rack") + if dataCenter == "" { + rack = fs.option.Rack + } ttlString := r.URL.Query().Get("ttl") // read ttl in seconds @@ -99,206 +92,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ttlSeconds = int32(ttl.Minutes()) * 60 } - if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked { - return - } - - if fs.option.Cipher { - reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, err) - } else if reply != nil { - writeJsonQuiet(w, r, http.StatusCreated, reply) - } - - return - } - - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) - - if err != nil || fileId == "" || urlLocation == "" { - glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) - writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)) - return - } - - glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) - - u, _ := url.Parse(urlLocation) - ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) - if err != nil { - return - } - - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil { - return - } - - // send back post result - reply := FilerPostResult{ - Name: ret.Name, - Size: int64(ret.Size), - Error: ret.Error, - Fid: fileId, - Url: urlLocation, - } - setEtag(w, ret.ETag) - writeJsonQuiet(w, r, http.StatusCreated, reply) -} - -// update metadata in filer store -func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string, - collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) { - - stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds()) - }() - - modeStr := r.URL.Query().Get("mode") - if modeStr == "" { - modeStr = "0660" - } - mode, err := strconv.ParseUint(modeStr, 8, 32) - if err != nil { - glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) - mode = 0660 - } - - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } - } - existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) - crTime := time.Now() - if err == nil && existingEntry != nil { - crTime = existingEntry.Crtime - } - entry := &filer2.Entry{ - FullPath: util.FullPath(path), - Attr: filer2.Attr{ - Mtime: time.Now(), - Crtime: crTime, - Mode: os.FileMode(mode), - Uid: OS_UID, - Gid: OS_GID, - Replication: replication, - Collection: collection, - TtlSec: ttlSeconds, - Mime: ret.Mime, - Md5: md5value, - }, - Chunks: []*filer_pb.FileChunk{{ - FileId: fileId, - Size: uint64(ret.Size), - Mtime: time.Now().UnixNano(), - ETag: ret.ETag, - }}, - } - if entry.Attr.Mime == "" { - if ext := filenamePath.Ext(path); ext != "" { - entry.Attr.Mime = mime.TypeByExtension(ext) - } - } - // glog.V(4).Infof("saving %s => %+v", path, entry) - if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil { - fs.filer.DeleteChunks(entry.Chunks) - glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - writeJsonError(w, r, http.StatusInternalServerError, dbErr) - err = dbErr - return - } - - return nil -} - -// send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) { - - stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() - start := time.Now() - defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() + fs.autoChunk(ctx, w, r, replication, collection, dataCenter, rack, ttlSeconds, ttlString, fsync) - ret = &operation.UploadResult{} - - md5Hash := md5.New() - body := r.Body - if r.Method == "PUT" { - // only PUT or large chunked files has Md5 in attributes - body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash)) - } - - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: body, - Host: r.Host, - ContentLength: r.ContentLength, - } - - if auth != "" { - request.Header.Set("Authorization", "BEARER "+string(auth)) - } - resp, doErr := util.Do(request) - if doErr != nil { - glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method) - writeJsonError(w, r, http.StatusInternalServerError, doErr) - err = doErr - return - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - }() - - respBody, raErr := ioutil.ReadAll(resp.Body) - if raErr != nil { - glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) - writeJsonError(w, r, http.StatusInternalServerError, raErr) - err = raErr - return - } - - glog.V(4).Infoln("post result", string(respBody)) - unmarshalErr := json.Unmarshal(respBody, &ret) - if unmarshalErr != nil { - glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody)) - writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr) - err = unmarshalErr - return - } - if ret.Error != "" { - err = errors.New(ret.Error) - glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error) - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - // find correct final path - path := r.URL.Path - if strings.HasSuffix(path, "/") { - if ret.Name != "" { - path += ret.Name - } else { - err = fmt.Errorf("can not to write to folder %s without a file name", path) - fs.filer.DeleteFileByFileId(fileId) - glog.V(0).Infoln("Can not to write to folder", path, "without a file name!") - writeJsonError(w, r, http.StatusInternalServerError, err) - return - } - } - // use filer calculated md5 ETag, instead of the volume server crc ETag - if r.Method == "PUT" { - md5value = md5Hash.Sum(nil) - } - ret.ETag = getEtag(resp) - return } // curl -X DELETE http://localhost:8888/path/to @@ -316,9 +111,14 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true" skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true" - err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion) + objectPath := r.URL.Path + if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") { + objectPath = objectPath[0 : len(objectPath)-1] + } + + err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil) if err != nil { - glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) + glog.V(1).Infoln("deleting", objectPath, ":", err.Error()) httpStatus := http.StatusInternalServerError if err == filer_pb.ErrNotFound { httpStatus = http.StatusNotFound @@ -344,6 +144,7 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st } // required by buckets folder + bucketDefaultReplication := "" if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") { bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:] t := strings.Index(bucketAndObjectKey, "/") @@ -353,7 +154,10 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st if t > 0 { collection = bucketAndObjectKey[:t] } - replication, fsync = fs.filer.ReadBucketOption(collection) + bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection) + } + if replication == "" { + replication = bucketDefaultReplication } return diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 532693742..d86d49b2a 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -3,15 +3,18 @@ package weed_server import ( "context" "crypto/md5" + "fmt" + "hash" "io" "io/ioutil" "net/http" + "os" "path" "strconv" "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -20,12 +23,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool { - if r.Method != "POST" { - glog.V(4).Infoln("AutoChunking not supported for method", r.Method) - return false - } +func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) { // autoChunking can be set at the command-line level or as a query param. Query param overrides command-line query := r.URL.Query() @@ -35,54 +33,47 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * if maxMB <= 0 && fs.option.MaxMB > 0 { maxMB = int32(fs.option.MaxMB) } - if maxMB <= 0 { - glog.V(4).Infoln("AutoChunking not enabled") - return false - } - glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)") chunkSize := 1024 * 1024 * maxMB - contentLength := int64(0) - if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 { - contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64) - if contentLength <= int64(chunkSize) { - glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.") - return false - } - } + stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() + start := time.Now() + defer func() { + stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) + }() - if contentLength <= 0 { - glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.") - return false + var reply *FilerPostResult + var err error + var md5bytes []byte + if r.Method == "POST" { + if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") { + reply, err = fs.mkdir(ctx, w, r) + } else { + reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, rack, ttlSec, ttlString, fsync) + } + } else { + reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, rack, ttlSec, ttlString, fsync) } - - reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { + if len(md5bytes) > 0 { + w.Header().Set("Content-MD5", util.Base64Encode(md5bytes)) + } writeJsonQuiet(w, r, http.StatusCreated, reply) } - return true } -func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { - - stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() - start := time.Now() - defer func() { - stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds()) - }() +func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { multipartReader, multipartReaderErr := r.MultipartReader() if multipartReaderErr != nil { - return nil, multipartReaderErr + return nil, nil, multipartReaderErr } part1, part1Err := multipartReader.NextPart() if part1Err != nil { - return nil, part1Err + return nil, nil, part1Err } fileName := part1.FileName() @@ -90,48 +81,63 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r fileName = path.Base(fileName) } contentType := part1.Header.Get("Content-Type") + if contentType == "application/octet-stream" { + contentType = "" + } - var fileChunks []*filer_pb.FileChunk + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, rack, ttlString, fileName, contentType, fsync) + if err != nil { + return nil, nil, err + } - md5Hash := md5.New() - var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash)) + fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, rack, ttlString, fsync), fileChunks) + if replyerr != nil { + glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) + return + } - chunkOffset := int64(0) + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) - for chunkOffset < contentLength { - limitedReader := io.LimitReader(partReader, int64(chunkSize)) + return +} - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) - if assignErr != nil { - return nil, assignErr - } +func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) { - // upload the chunk to the volume server - uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) - if uploadErr != nil { - return nil, uploadErr - } + fileName := "" + contentType := "" - // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { - break - } + fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, rack, ttlString, fileName, contentType, fsync) + if err != nil { + return nil, nil, err + } - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, rack, ttlString, fsync), fileChunks) + if replyerr != nil { + glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr) + return + } - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength) + md5bytes = md5Hash.Sum(nil) + filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset) - // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) + return +} - // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { - break - } +func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) { + + // detect file mode + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 } + // fix the path path := r.URL.Path if strings.HasSuffix(path, "/") { if fileName != "" { @@ -139,20 +145,28 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r } } + // fix the crTime + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + crTime := time.Now() + if err == nil && existingEntry != nil { + crTime = existingEntry.Crtime + } + glog.V(4).Infoln("saving", path) - entry := &filer2.Entry{ + entry := &filer.Entry{ FullPath: util.FullPath(path), - Attr: filer2.Attr{ + Attr: filer.Attr{ Mtime: time.Now(), - Crtime: time.Now(), - Mode: 0660, + Crtime: crTime, + Mode: os.FileMode(mode), Uid: OS_UID, Gid: OS_GID, Replication: replication, Collection: collection, TtlSec: ttlSec, Mime: contentType, - Md5: md5Hash.Sum(nil), + Md5: md5bytes, + FileSize: uint64(chunkOffset), }, Chunks: fileChunks, } @@ -162,15 +176,57 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r Size: chunkOffset, } - if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) - return } + return filerResult, replyerr +} - return +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) { + var fileChunks []*filer_pb.FileChunk + + md5Hash := md5.New() + var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) + + chunkOffset := int64(0) + + for { + limitedReader := io.LimitReader(partReader, int64(chunkSize)) + + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync) + if assignErr != nil { + return nil, nil, 0, assignErr + } + + // upload the chunk to the volume server + uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) + if uploadErr != nil { + return nil, nil, 0, uploadErr + } + + // if last chunk exhausted the reader exactly at the border + if uploadResult.Size == 0 { + break + } + + // Save to chunk manifest structure + fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) + + // reset variables for the next chunk + chunkOffset = chunkOffset + int64(uploadResult.Size) + + // if last chunk was not at full chunk size, but already exhausted the reader + if int64(uploadResult.Size) < int64(chunkSize) { + break + } + } + return fileChunks, md5Hash, chunkOffset, nil } func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) { @@ -184,3 +240,71 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) return uploadResult, err } + +func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, rack string, ttlString string, fsync bool) filer.SaveDataAsChunkFunctionType { + + return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) { + // assign one file id for one chunk + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync) + if assignErr != nil { + return nil, "", "", assignErr + } + + // upload the chunk to the volume server + uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth) + if uploadErr != nil { + return nil, "", "", uploadErr + } + + return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil + } +} + +func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http.Request) (filerResult *FilerPostResult, replyerr error) { + + // detect file mode + modeStr := r.URL.Query().Get("mode") + if modeStr == "" { + modeStr = "0660" + } + mode, err := strconv.ParseUint(modeStr, 8, 32) + if err != nil { + glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr) + mode = 0660 + } + + // fix the path + path := r.URL.Path + if strings.HasSuffix(path, "/") { + path = path[:len(path)-1] + } + + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) + if err == nil && existingEntry != nil { + replyerr = fmt.Errorf("dir %s already exists", path) + return + } + + glog.V(4).Infoln("mkdir", path) + entry := &filer.Entry{ + FullPath: util.FullPath(path), + Attr: filer.Attr{ + Mtime: time.Now(), + Crtime: time.Now(), + Mode: os.FileMode(mode) | os.ModeDir, + Uid: OS_UID, + Gid: OS_GID, + }, + } + + filerResult = &FilerPostResult{ + Name: util.FullPath(path).Name(), + } + + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { + replyerr = dbErr + filerResult.Error = dbErr.Error() + glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr) + } + return filerResult, replyerr +} diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index bea72b2c1..720d97027 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -7,7 +7,7 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" @@ -16,10 +16,9 @@ import ( ) // handling single chunk POST or PUT upload -func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, - replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) { +func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, rack string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) { - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) @@ -38,6 +37,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht } if pu.MimeType == "" { pu.MimeType = http.DetectContentType(uncompressedData) + // println("detect2 mimetype to", pu.MimeType) } uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth) @@ -57,9 +57,9 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht } } - entry := &filer2.Entry{ + entry := &filer.Entry{ FullPath: util.FullPath(path), - Attr: filer2.Attr{ + Attr: filer.Attr{ Mtime: time.Now(), Crtime: time.Now(), Mode: 0660, @@ -69,6 +69,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Collection: collection, TtlSec: ttlSeconds, Mime: pu.MimeType, + Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, Chunks: fileChunks, } @@ -78,7 +79,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Size: int64(pu.OriginalDataSize), } - if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) err = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go index e532b27e2..f86dde5b1 100644 --- a/weed/server/filer_ui/templates.go +++ b/weed/server/filer_ui/templates.go @@ -3,18 +3,29 @@ package master_ui import ( "github.com/dustin/go-humanize" "html/template" + "net/url" + "strings" ) +func printpath(parts ...string) string { + concat := strings.Join(parts, "") + escaped := url.PathEscape(concat) + return strings.ReplaceAll(escaped, "%2F", "/") +} + var funcMap = template.FuncMap{ "humanizeBytes": humanize.Bytes, + "printpath": printpath, } var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html> <html> <head> - <title>SeaweedFS Filer</title> - <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> + <title>SeaweedFS Filer</title> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> <style> +body { padding-bottom: 70px; } #drop-area { border: 1px transparent; } @@ -37,6 +48,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC #fileElem { display: none; } +.qrImage { + display: block; + margin-left: auto; + margin-right: auto; +} </style> </head> <body> @@ -50,7 +66,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <div class="row"> <div> {{ range $entry := .Breadcrumbs }} - <a href="{{ $entry.Link }}" > + <a href="{{ printpath $entry.Link }}" > {{ $entry.Name }} </a> {{ end }} @@ -69,11 +85,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <td> {{if $entry.IsDirectory}} <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23"> - <a href={{ print $path "/" $entry.Name "/"}} > + <a href="{{ printpath $path "/" $entry.Name "/"}}" > {{ $entry.Name }} </a> {{else}} - <a href={{ print $path "/" $entry.Name }} > + <a href="{{ printpath $path "/" $entry.Name }}" > {{ $entry.Name }} </a> {{end}} @@ -107,6 +123,14 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC </a> </div> {{end}} + + <br/> + <br/> + + <div class="navbar navbar-fixed-bottom"> + <img src="data:image/png;base64,{{.QrImage}}" class="qrImage" /> + </div> + </div> </body> <script type="text/javascript"> diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 1ee214deb..e8fa3995d 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "net" "strings" "time" @@ -12,21 +13,19 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" - "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" ) func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error { var dn *topology.DataNode - t := ms.Topo defer func() { if dn != nil { // if the volume server disconnects and reconnects quickly // the unregister and register can race with each other - t.UnRegisterDataNode(dn) + ms.Topo.UnRegisterDataNode(dn) glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) message := &master_pb.VolumeLocation{ @@ -62,21 +61,18 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } - t.Sequence.SetMax(heartbeat.MaxFileKey) + ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey) if dn == nil { - dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) - dc := t.GetOrCreateDataCenter(dcName) + dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, int64(heartbeat.MaxVolumeCount)) glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ - VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, - MetricsAddress: ms.option.MetricsAddress, - MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), - StorageBackends: backend.ToPbStorageBackends(), + VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, }); err != nil { glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err) return err @@ -102,12 +98,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ message.DeletedVids = append(message.DeletedVids, volInfo.Id) } // update master internal volume layouts - t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) + ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn) } if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { // process heartbeat.Volumes - newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn) + newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn) for _, v := range newVolumes { glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url()) @@ -122,7 +118,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 { // update master internal volume layouts - t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) + ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) for _, s := range heartbeat.NewEcShards { message.NewVids = append(message.NewVids, s.Id) @@ -137,8 +133,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards { - glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) - newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn) + glog.V(1).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) + newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn) // broadcast the ec vid changes to master clients for _, s := range newShards { @@ -163,7 +159,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } // tell the volume servers about the leader - newLeader, err := t.Leader() + newLeader, err := ms.Topo.Leader() if err != nil { glog.Warningf("SendHeartbeat find leader: %v", err) return err @@ -192,7 +188,8 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ peerAddress := findClientAddress(stream.Context(), req.GrpcPort) - stopChan := make(chan bool) + // buffer by 1 so we don't end up getting stuck writing to stopChan forever + stopChan := make(chan bool, 1) clientName, messageChan := ms.addClient(req.Name, peerAddress) @@ -252,7 +249,12 @@ func (ms *MasterServer) addClient(clientType string, clientAddress string) (clie clientName = clientType + "@" + clientAddress glog.V(0).Infof("+ client %v", clientName) - messageChan = make(chan *master_pb.VolumeLocation) + // we buffer this because otherwise we end up in a potential deadlock where + // the KeepConnected loop is no longer listening on this channel but we're + // trying to send to it in SendHeartbeat and so we can't lock the + // clientChansLock to remove the channel and we're stuck writing to it + // 100 is probably overkill + messageChan = make(chan *master_pb.VolumeLocation, 100) ms.clientChansLock.Lock() ms.clientChans[clientName] = messageChan @@ -301,3 +303,19 @@ func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.Li } return resp, nil } + +func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { + + // tell the volume servers about the leader + leader, _ := ms.Topo.Leader() + + resp := &master_pb.GetMasterConfigurationResponse{ + MetricsAddress: ms.option.MetricsAddress, + MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), + StorageBackends: backend.ToPbStorageBackends(), + DefaultReplication: ms.option.DefaultReplicaPlacement, + Leader: leader, + } + + return resp, nil +} diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 282c75679..03b718291 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -3,7 +3,6 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/raft" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" @@ -178,13 +177,3 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku return resp, nil } - -func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { - - resp := &master_pb.GetMasterConfigurationResponse{ - MetricsAddress: ms.option.MetricsAddress, - MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec), - } - - return resp, nil -} diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 9a490bb1f..cc1c4b2ad 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -7,7 +7,6 @@ import ( "net/url" "os" "regexp" - "strconv" "strings" "sync" "time" @@ -32,11 +31,11 @@ const ( ) type MasterOption struct { - Host string - Port int - MetaFolder string - VolumeSizeLimitMB uint - VolumePreallocate bool + Host string + Port int + MetaFolder string + VolumeSizeLimitMB uint + VolumePreallocate bool // PulseSeconds int DefaultReplicaPlacement string GarbageThreshold float64 @@ -66,7 +65,7 @@ type MasterServer struct { MasterClient *wdclient.MasterClient - adminLocks *AdminLocks + adminLocks *AdminLocks } func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { @@ -139,14 +138,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { ms.Topo.RaftServer = raftServer.raftServer ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infof("event: %+v", e) + glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) if ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") } }) - ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) { - glog.V(0).Infof("state change: %+v", e) - }) if ms.Topo.IsLeader() { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") } else { @@ -210,7 +206,7 @@ func (ms *MasterServer) startAdminScripts() { scriptLines = append(scriptLines, "unlock") } - masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) + masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port) var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 7595c0171..34235384f 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -110,7 +110,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) } else { url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path } - http.Redirect(w, r, url, http.StatusMovedPermanently) + http.Redirect(w, r, url, http.StatusPermanentRedirect) } else { writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error)) } diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index 7189064d0..60873f6aa 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -88,7 +88,11 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> <tr> <td><code>{{ $dc.Id }}</code></td> <td>{{ $rack.Id }}</td> - <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a></td> + <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a> + {{ if ne $dn.PublicUrl $dn.Url }} + / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a> + {{ end }} + </td> <td>{{ $dn.Volumes }}</td> <td>{{ $dn.VolumeIds}}</td> <td>{{ $dn.EcShards }}</td> diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 0381c7feb..85841e409 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,10 +2,9 @@ package weed_server import ( "encoding/json" - "io/ioutil" + "math/rand" "os" "path" - "reflect" "sort" "time" @@ -28,7 +27,31 @@ type RaftServer struct { *raft.GrpcServer } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer { +type StateMachine struct { + raft.StateMachine + topo *topology.Topology +} + +func (s StateMachine) Save() ([]byte, error) { + state := topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + } + glog.V(1).Infof("Save raft state %+v", state) + return json.Marshal(state) +} + +func (s StateMachine) Recovery(data []byte) error { + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(data, &state) + if err != nil { + return err + } + glog.V(1).Infof("Recovery raft state %+v", state) + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + return nil +} + +func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) { s := &RaftServer{ peers: peers, serverAddr: serverAddr, @@ -46,47 +69,66 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d transporter := raft.NewGrpcTransporter(grpcDialOption) glog.V(0).Infof("Starting RaftServer with %v", serverAddr) - // Clear old cluster configurations if peers are changed - if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed { - glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers) + if !raftResumeState { + // always clear previous metadata os.RemoveAll(path.Join(s.dataDir, "conf")) os.RemoveAll(path.Join(s.dataDir, "log")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) } + if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil { + return nil, err + } - s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "") + stateMachine := StateMachine{topo: topo} + s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "") if err != nil { glog.V(0).Infoln(err) - return nil + return nil, err + } + s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond) + s.raftServer.SetElectionTimeout(10 * time.Second) + if err := s.raftServer.LoadSnapshot(); err != nil { + return nil, err + } + if err := s.raftServer.Start(); err != nil { + return nil, err } - s.raftServer.SetHeartbeatInterval(500 * time.Millisecond) - s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond) - s.raftServer.Start() for _, peer := range s.peers { - s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)) + if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil { + return nil, err + } + } + + // Remove deleted peers + for existsPeerName := range s.raftServer.Peers() { + exists, existingPeer := false, "" + for _, peer := range s.peers { + if pb.ServerToGrpcAddress(peer) == existsPeerName { + exists, existingPeer = true, peer + break + } + } + if exists { + if err := s.raftServer.RemovePeer(existsPeerName); err != nil { + glog.V(0).Infoln(err) + return nil, err + } else { + glog.V(0).Infof("removing old peer %s", existingPeer) + } + } } s.GrpcServer = raft.NewGrpcServer(s.raftServer) if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { // Initialize the server by joining itself. - glog.V(0).Infoln("Initializing new cluster") - - _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ - Name: s.raftServer.Name(), - ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), - }) - - if err != nil { - glog.V(0).Infoln(err) - return nil - } + // s.DoJoinCommand() } glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) - return s + return s, nil } func (s *RaftServer) Peers() (members []string) { @@ -99,34 +141,6 @@ func (s *RaftServer) Peers() (members []string) { return } -func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) { - confPath := path.Join(dir, "conf") - // open conf file - b, err := ioutil.ReadFile(confPath) - if err != nil { - return oldPeers, true - } - conf := &raft.Config{} - if err = json.Unmarshal(b, conf); err != nil { - return oldPeers, true - } - - for _, p := range conf.Peers { - oldPeers = append(oldPeers, p.Name) - } - oldPeers = append(oldPeers, self) - - if len(peers) == 0 && len(oldPeers) <= 1 { - return oldPeers, false - } - - sort.Strings(peers) - sort.Strings(oldPeers) - - return oldPeers, !reflect.DeepEqual(peers, oldPeers) - -} - func isTheFirstOne(self string, peers []string) bool { sort.Strings(peers) if len(peers) <= 0 { @@ -134,3 +148,16 @@ func isTheFirstOne(self string, peers []string) bool { } return self == peers[0] } + +func (s *RaftServer) DoJoinCommand() { + + glog.V(0).Infoln("Initializing new cluster") + + if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ + Name: s.raftServer.Name(), + ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), + }); err != nil { + glog.Errorf("fail to send join command: %v", err) + } + +} diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index fd38cb977..252570eab 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -1,20 +1,24 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/storage/needle" "net/http" ) type ClusterStatusResult struct { - IsLeader bool `json:"IsLeader,omitempty"` - Leader string `json:"Leader,omitempty"` - Peers []string `json:"Peers,omitempty"` + IsLeader bool `json:"IsLeader,omitempty"` + Leader string `json:"Leader,omitempty"` + Peers []string `json:"Peers,omitempty"` + MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"` } func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { ret := ClusterStatusResult{ - IsLeader: s.topo.IsLeader(), - Peers: s.Peers(), + IsLeader: s.topo.IsLeader(), + Peers: s.Peers(), + MaxVolumeId: s.topo.GetMaxVolumeId(), } + if leader, e := s.topo.Leader(); e == nil { ret.Leader = leader } diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 27b21ac09..9296c63e9 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -10,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" + "github.com/chrislusf/seaweedfs/weed/storage/types" ) func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) { @@ -148,7 +149,35 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv } return resp, err +} + +func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) { + + resp := &volume_server_pb.VolumeMarkWritableResponse{} + err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId)) + + if err != nil { + glog.Errorf("volume mark writable %v: %v", req, err) + } else { + glog.V(2).Infof("volume mark writable %v", req) + } + + return resp, err +} + +func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) { + + resp := &volume_server_pb.VolumeStatusResponse{} + + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + resp.IsReadOnly = v.IsReadOnly() + + return resp, nil } func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) { @@ -166,3 +195,54 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv return resp, nil } + +func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) { + + resp := &volume_server_pb.VolumeServerLeaveResponse{} + + vs.StopHeartbeat() + + return resp, nil + +} + +func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) { + + resp := &volume_server_pb.VolumeNeedleStatusResponse{} + + volumeId := needle.VolumeId(req.VolumeId) + + n := &needle.Needle{ + Id: types.NeedleId(req.NeedleId), + } + + var count int + var err error + hasVolume := vs.store.HasVolume(volumeId) + if !hasVolume { + _, hasEcVolume := vs.store.FindEcVolume(volumeId) + if !hasEcVolume { + return nil, fmt.Errorf("volume not found %d", req.VolumeId) + } + count, err = vs.store.ReadEcShardNeedle(volumeId, n) + } else { + count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil) + } + if err != nil { + return nil, err + } + if count < 0 { + return nil, fmt.Errorf("needle not found %d", n.Id) + } + + resp.NeedleId = uint64(n.Id) + resp.Cookie = uint32(n.Cookie) + resp.Size = uint32(n.Size) + resp.LastModified = n.LastModified + resp.Crc = n.Checksum.Value() + if n.HasTtl() { + resp.Ttl = n.Ttl.String() + } + return resp, nil + +} diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index 501964191..8e84dc2a8 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -41,7 +41,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B } else { n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, Status: http.StatusNotFound, @@ -79,7 +79,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, Status: http.StatusAccepted, - Size: size}, + Size: uint32(size)}, ) } } diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index 7cb836344..199f8faba 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,7 +2,7 @@ package weed_server import ( "fmt" - "net" + "github.com/chrislusf/seaweedfs/weed/operation" "time" "google.golang.org/grpc" @@ -22,6 +22,31 @@ import ( func (vs *VolumeServer) GetMaster() string { return vs.currentMaster } + +func (vs *VolumeServer) checkWithMaster() (err error) { + isConnected := false + for !isConnected { + for _, master := range vs.SeedMasterNodes { + err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", master, err) + } + vs.metricsAddress, vs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) + backend.LoadFromPbStorageBackends(resp.StorageBackends) + return nil + }) + if err == nil { + return + } else { + glog.V(0).Infof("checkWithMaster %s: %v", master, err) + } + } + time.Sleep(1790 * time.Millisecond) + } + return +} + func (vs *VolumeServer) heartbeat() { glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes) @@ -32,7 +57,7 @@ func (vs *VolumeServer) heartbeat() { var err error var newLeader string - for { + for vs.isHeartbeating { for _, master := range vs.SeedMasterNodes { if newLeader != "" { // the new leader may actually is the same master @@ -53,20 +78,35 @@ func (vs *VolumeServer) heartbeat() { newLeader = "" vs.store.MasterAddress = "" } + if !vs.isHeartbeating { + break + } } } } +func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) { + if !vs.isHeartbeating { + return true + } + vs.isHeartbeating = false + close(vs.stopChan) + return false +} + func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) - stream, err := client.SendHeartbeat(context.Background()) + stream, err := client.SendHeartbeat(ctx) if err != nil { glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) return "", err @@ -87,23 +127,16 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) if vs.store.MaybeAdjustVolumeMax() { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err) } } } - if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) { - glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) + if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() { + glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster) newLeader = in.GetLeader() doneChan <- nil return } - if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() { - vs.MetricsAddress = in.GetMetricsAddress() - vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds()) - } - if len(in.StorageBackends) > 0 { - backend.LoadFromPbStorageBackends(in.StorageBackends) - } } }() @@ -182,19 +215,8 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } case err = <-doneChan: return + case <-vs.stopChan: + return } } } - -func isSameIP(ip string, host string) bool { - ips, err := net.LookupIP(host) - if err != nil { - return false - } - for _, t := range ips { - if ip == t.String() { - return true - } - } - return false -} diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 5c7d5572c..17372eef4 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "io" + "io/ioutil" "math" "os" "time" @@ -27,17 +28,12 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId) - err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)) - if err != nil { - return nil, fmt.Errorf("failed to mount existing volume %d: %v", req.VolumeId, err) - } - - err = vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) + err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) if err != nil { return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err) } - glog.V(0).Infof("deleted exisitng volume %d before copying.", req.VolumeId) + glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId) } location := vs.store.FindFreeLocation() @@ -65,13 +61,14 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) + ioutil.WriteFile(volumeFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755) + // println("source:", volFileInfoResp.String()) - // copy ecx file - if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { return err } - if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { return err } @@ -79,6 +76,8 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo return err } + os.Remove(volumeFileName+".note") + return nil }) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 66dd5bf8d..55e0261c8 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -38,6 +38,8 @@ Steps to apply erasure coding to .dat .idx files // VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) { + glog.V(0).Infof("VolumeEcShardsGenerate: %v", req) + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v == nil { return nil, fmt.Errorf("volume %d not found", req.VolumeId) @@ -48,16 +50,16 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } - // write .ecx file - if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil { - return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err) - } - // write .ec00 ~ .ec13 files if err := erasure_coding.WriteEcFiles(baseFileName); err != nil { return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) } + // write .ecx file + if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil { + return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err) + } + // write .vif files if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil { return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) @@ -69,6 +71,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ // VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) { + glog.V(0).Infof("VolumeEcShardsRebuild: %v", req) + baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) var rebuiltShardIds []uint32 @@ -99,6 +103,8 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s // VolumeEcShardsCopy copy the .ecx and some ec data slices func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) { + glog.V(0).Infof("VolumeEcShardsCopy: %v", req) + location := vs.store.FindFreeLocation() if location == nil { return nil, fmt.Errorf("no space left") @@ -201,9 +207,7 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se if err := os.Remove(baseFilename + ".ecx"); err != nil { return nil, err } - if err := os.Remove(baseFilename + ".ecj"); err != nil { - return nil, err - } + os.Remove(baseFilename + ".ecj") } if !hasIdxFile { // .vif is used for ec volumes and normal volumes @@ -215,6 +219,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) { + glog.V(0).Infof("VolumeEcShardsMount: %v", req) + for _, shardId := range req.ShardIds { err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId)) @@ -234,6 +240,8 @@ func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_ser func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) { + glog.V(0).Infof("VolumeEcShardsUnmount: %v", req) + for _, shardId := range req.ShardIds { err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId)) @@ -264,7 +272,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea if req.FileKey != 0 { _, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey)) - if size == types.TombstoneFileSize { + if size.IsDeleted() { return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{ IsDeleted: true, }) @@ -321,6 +329,8 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) { + glog.V(0).Infof("VolumeEcBlobDelete: %v", req) + resp := &volume_server_pb.VolumeEcBlobDeleteResponse{} for _, location := range vs.store.Locations { @@ -330,7 +340,7 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv if err != nil { return nil, fmt.Errorf("locate in local ec volume: %v", err) } - if size == types.TombstoneFileSize { + if size.IsDeleted() { return resp, nil } @@ -349,6 +359,8 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv // VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) { + glog.V(0).Infof("VolumeEcShardsToVolume: %v", req) + v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId)) if !found { return nil, fmt.Errorf("ec volume %d not found", req.VolumeId) diff --git a/weed/server/volume_grpc_file.go b/weed/server/volume_grpc_file.go deleted file mode 100644 index 4d71ddeb1..000000000 --- a/weed/server/volume_grpc_file.go +++ /dev/null @@ -1,129 +0,0 @@ -package weed_server - -import ( - "encoding/json" - "net/http" - "strings" - - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/util" -) - -func (vs *VolumeServer) FileGet(req *volume_server_pb.FileGetRequest, stream volume_server_pb.VolumeServer_FileGetServer) error { - - headResponse := &volume_server_pb.FileGetResponse{} - n := new(needle.Needle) - - commaIndex := strings.LastIndex(req.FileId, ",") - vid := req.FileId[:commaIndex] - fid := req.FileId[commaIndex+1:] - - volumeId, err := needle.NewVolumeId(vid) - if err != nil { - headResponse.ErrorCode = http.StatusBadRequest - return stream.Send(headResponse) - } - err = n.ParsePath(fid) - if err != nil { - headResponse.ErrorCode = http.StatusBadRequest - return stream.Send(headResponse) - } - - hasVolume := vs.store.HasVolume(volumeId) - _, hasEcVolume := vs.store.FindEcVolume(volumeId) - - if !hasVolume && !hasEcVolume { - headResponse.ErrorCode = http.StatusMovedPermanently - return stream.Send(headResponse) - } - - cookie := n.Cookie - var count int - if hasVolume { - count, err = vs.store.ReadVolumeNeedle(volumeId, n) - } else if hasEcVolume { - count, err = vs.store.ReadEcShardNeedle(volumeId, n) - } - - if err != nil || count < 0 { - headResponse.ErrorCode = http.StatusNotFound - return stream.Send(headResponse) - } - if n.Cookie != cookie { - headResponse.ErrorCode = http.StatusNotFound - return stream.Send(headResponse) - } - - if n.LastModified != 0 { - headResponse.LastModified = n.LastModified - } - - headResponse.Etag = n.Etag() - - if n.HasPairs() { - pairMap := make(map[string]string) - err = json.Unmarshal(n.Pairs, &pairMap) - if err != nil { - glog.V(0).Infoln("Unmarshal pairs error:", err) - } - headResponse.Headers = pairMap - } - - /* - // skip this, no redirection - if vs.tryHandleChunkedFile(n, filename, w, r) { - return - } - */ - - if n.NameSize > 0 { - headResponse.Filename = string(n.Name) - } - mtype := "" - if n.MimeSize > 0 { - mt := string(n.Mime) - if !strings.HasPrefix(mt, "application/octet-stream") { - mtype = mt - } - } - headResponse.ContentType = mtype - - headResponse.IsGzipped = n.IsGzipped() - - if n.IsGzipped() && req.AcceptGzip { - if n.Data, err = util.UnGzipData(n.Data); err != nil { - glog.V(0).Infof("ungzip %s error: %v", req.FileId, err) - } - } - - headResponse.ContentLength = uint32(len(n.Data)) - bytesToRead := len(n.Data) - bytesRead := 0 - - t := headResponse - - for bytesRead < bytesToRead { - - stopIndex := bytesRead + BufferSizeLimit - if stopIndex > bytesToRead { - stopIndex = bytesToRead - } - - if t == nil { - t = &volume_server_pb.FileGetResponse{} - } - t.Data = n.Data[bytesRead:stopIndex] - - err = stream.Send(t) - t = nil - if err != nil { - return err - } - - bytesRead = stopIndex - } - - return nil -} diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go index 767e28e7b..2f4fab96a 100644 --- a/weed/server/volume_grpc_query.go +++ b/weed/server/volume_grpc_query.go @@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_ n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err) return err } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 62fbc19a7..83df32fdd 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -28,14 +28,16 @@ type VolumeServer struct { FixJpgOrientation bool ReadRedirect bool compactionBytePerSecond int64 - MetricsAddress string - MetricsIntervalSec int + metricsAddress string + metricsIntervalSec int fileSizeLimitBytes int64 + isHeartbeating bool + stopChan chan bool } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, port int, publicUrl string, - folders []string, maxCounts []int, minFreeSpacePercent []float32, + folders []string, maxCounts []int, minFreeSpacePercents []float32, needleMapKind storage.NeedleMapType, masterNodes []string, pulseSeconds int, dataCenter string, rack string, @@ -66,16 +68,21 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, + isHeartbeating: true, + stopChan: make(chan bool), } vs.SeedMasterNodes = masterNodes - vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercent, vs.needleMapKind) + + vs.checkWithMaster() + + vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) + adminMux.HandleFunc("/status", vs.statusHandler) if signingKey == "" || enableUiAccess { // only expose the volume server details for safe environments adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) - adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) /* adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) @@ -90,11 +97,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, } go vs.heartbeat() - hostAddress := fmt.Sprintf("%s:%d", ip, port) - go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather, - func() (addr string, intervalSeconds int) { - return vs.MetricsAddress, vs.MetricsIntervalSec - }) + go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec) return vs } diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 14ad27d42..ad13cdf3b 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/util" "net/http" "strings" @@ -25,6 +26,7 @@ security settings: */ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) switch r.Method { case "GET", "HEAD": stats.ReadRequest() @@ -39,6 +41,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque } func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) switch r.Method { case "GET": stats.ReadRequest() diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 34655d833..4d84c9c4d 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -10,6 +10,7 @@ import ( ) func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) m := make(map[string]interface{}) m["Version"] = util.Version() var ds []*volume_server_pb.DiskStatus @@ -24,6 +25,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { } func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) m := make(map[string]interface{}) m["Version"] = util.Version() var ds []*volume_server_pb.DiskStatus diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 19b459136..15fd446e7 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -18,6 +18,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -26,6 +27,8 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"") func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { + // println(r.Method + " " + r.URL.Path) + stats.VolumeServerRequestCounter.WithLabelValues("get").Inc() start := time.Now() defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }() @@ -79,15 +82,24 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } cookie := n.Cookie + + readOption := &storage.ReadOption{ + ReadDeleted: r.FormValue("readDeleted") == "true", + } + var count int if hasVolume { - count, err = vs.store.ReadVolumeNeedle(volumeId, n) + count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption) } else if hasEcVolume { count, err = vs.store.ReadEcShardNeedle(volumeId, n) } + if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume { + glog.V(4).Infof("read needle: %v", err) + // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request + } // glog.V(4).Infoln("read bytes", count, "error", err) if err != nil || count < 0 { - glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err) + glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err) w.WriteHeader(http.StatusNotFound) return } @@ -142,20 +154,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - if ext != ".gz" { - if n.IsGzipped() { - if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize { - if n.Data, err = util.UnGzipData(n.Data); err != nil { - glog.V(0).Infoln("ungzip error:", err, r.URL.Path) - } - } else { - w.Header().Set("Content-Encoding", "gzip") - } - } else { - if n.Data, err = util.UnGzipData(n.Data); err != nil { - glog.V(0).Infoln("ungzip error:", err, r.URL.Path) - } + if n.IsCompressed() { + if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize { + if n.Data, err = util.DecompressData(n.Data); err != nil { + glog.V(0).Infoln("ungzip error:", err, r.URL.Path) + } + } else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) { + w.Header().Set("Content-Encoding", "zstd") + } else if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && util.IsGzippedContent(n.Data) { + w.Header().Set("Content-Encoding", "gzip") + } else { + if n.Data, err = util.DecompressData(n.Data); err != nil { + glog.V(0).Infoln("uncompress error:", err, r.URL.Path) } } } @@ -172,7 +182,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, return false } - chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped()) + chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsCompressed()) if e != nil { glog.V(0).Infof("load chunked manifest (%s) error: %v", r.URL.Path, e) return false @@ -208,7 +218,9 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext string, r *http.Request) io.ReadSeeker { rs := originalDataReaderSeeker - + if len(ext) > 0 { + ext = strings.ToLower(ext) + } width, height, mode, shouldResize := shouldResizeImages(ext, r) if shouldResize { rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, mode) @@ -217,9 +229,6 @@ func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext strin } func shouldResizeImages(ext string, r *http.Request) (width, height int, mode string, shouldResize bool) { - if len(ext) > 0 { - ext = strings.ToLower(ext) - } if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" { if r.FormValue("width") != "" { width, _ = strconv.Atoi(r.FormValue("width")) @@ -245,13 +254,13 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re } w.Header().Set("Accept-Ranges", "bytes") + adjustHeaderContentDisposition(w, r, filename) + if r.Method == "HEAD" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) return nil } - adjustHeadersAfterHEAD(w, r, filename) - processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { if _, e = rs.Seek(offset, 0); e != nil { return e diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index 8b2027e7b..e535327e2 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -13,6 +13,7 @@ import ( ) func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) infos := make(map[string]interface{}) infos["Up Time"] = time.Now().Sub(startTime).String() var ds []*volume_server_pb.DiskStatus diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 9a00dcc29..01a77b901 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -13,6 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" + "github.com/chrislusf/seaweedfs/weed/util" ) func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { @@ -42,7 +43,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) + reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return @@ -67,9 +68,10 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { ret.Name = string(reqNeedle.Name) } ret.Size = uint32(originalSize) - ret.ETag = reqNeedle.Etag() + ret.ETag = fmt.Sprintf("%x", util.Base64Md5ToBytes(contentMd5)) ret.Mime = string(reqNeedle.Mime) setEtag(w, ret.ETag) + w.Header().Set("Content-MD5", contentMd5) writeJsonQuiet(w, r, httpStatus, ret) } @@ -103,7 +105,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } - _, ok := vs.store.ReadVolumeNeedle(volumeId, n) + _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil) if ok != nil { m := make(map[string]uint32) m["size"] = 0 @@ -120,7 +122,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { count := int64(n.Size) if n.IsChunkedManifest() { - chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped()) + chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsCompressed()) if e != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e)) return diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index d86664542..3e9f882e3 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -10,7 +10,6 @@ import ( "strings" "time" - "github.com/chrislusf/seaweedfs/weed/util/grace" "golang.org/x/net/webdav" "google.golang.org/grpc" @@ -20,7 +19,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" - "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/security" ) @@ -42,7 +41,7 @@ type WebDavOption struct { type WebDavServer struct { option *WebDavOption secret security.SigningKey - filer *filer2.Filer + filer *filer.Filer grpcDialOption grpc.DialOption Handler *webdav.Handler } @@ -68,9 +67,10 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) { type WebDavFileSystem struct { option *WebDavOption secret security.SigningKey - filer *filer2.Filer + filer *filer.Filer grpcDialOption grpc.DialOption - chunkCache *chunk_cache.ChunkCache + chunkCache *chunk_cache.TieredChunkCache + signature int32 } type FileInfo struct { @@ -94,19 +94,17 @@ type WebDavFile struct { isDirectory bool off int64 entry *filer_pb.Entry - entryViewCache []filer2.VisibleInterval + entryViewCache []filer.VisibleInterval reader io.ReaderAt } func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { - chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB) - grace.OnInterrupt(func() { - chunkCache.Shutdown() - }) + chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024) return &WebDavFileSystem{ option: option, chunkCache: chunkCache, + signature: util.RandomInt32(), }, nil } @@ -120,8 +118,8 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) } -func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string { - return hostAndPort +func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { + return location.Url } func clearName(name string) (string, error) { @@ -169,6 +167,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm Gid: fs.option.Gid, }, }, + Signatures: []int32{fs.signature}, } glog.V(1).Infof("mkdir: %v", request) @@ -220,6 +219,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f TtlSec: 0, }, }, + Signatures: []int32{fs.signature}, }); err != nil { return fmt.Errorf("create %s: %v", fullFilePath, err) } @@ -259,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) dir, name := util.FullPath(fullFilePath).DirAndName() - return filer_pb.Remove(fs, dir, name, true, false, false) + return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature}) } @@ -338,7 +338,7 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F if err != nil { return nil, err } - fi.size = int64(filer2.TotalSize(entry.GetChunks())) + fi.size = int64(filer.FileSize(entry)) fi.name = string(fullpath) fi.mode = os.FileMode(entry.Attributes.FileMode) fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0) @@ -387,7 +387,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { Count: 1, Replication: "", Collection: f.fs.option.Collection, - ParentPath: dir, + Path: f.name, } resp, err := client.AssignVolume(ctx, request) @@ -426,8 +426,9 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Attributes.Replication = replication request := &filer_pb.UpdateEntryRequest{ - Directory: dir, - Entry: f.entry, + Directory: dir, + Entry: f.entry, + Signatures: []int32{f.fs.signature}, } if _, err := client.UpdateEntry(ctx, request); err != nil { @@ -470,16 +471,17 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { if err != nil { return 0, err } - if len(f.entry.Chunks) == 0 { + fileSize := int64(filer.FileSize(f.entry)) + if fileSize == 0 { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) + f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks) f.reader = nil } if f.reader == nil { - chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32) - f.reader = filer2.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache) + chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64) + f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize) } readSize, err = f.reader.ReadAt(p, f.off) @@ -487,11 +489,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize)) f.off += int64(readSize) - if err == io.EOF { - err = nil - } - - if err != nil { + if err != nil && err != io.EOF { glog.Errorf("file read %s: %v", f.name, err) } @@ -507,7 +505,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error { fi := FileInfo{ - size: int64(filer2.TotalSize(entry.GetChunks())), + size: int64(filer.FileSize(entry)), name: entry.Name, mode: os.FileMode(entry.Attributes.FileMode), modifiledTime: time.Unix(entry.Attributes.Mtime, 0), @@ -550,9 +548,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) { var err error switch whence { - case 0: + case io.SeekStart: f.off = 0 - case 2: + case io.SeekEnd: if fi, err := f.fs.stat(ctx, f.name); err != nil { return 0, err } else { |
