diff options
| author | Bl1tz23 <alex3angle@gmail.com> | 2021-08-10 13:45:24 +0300 |
|---|---|---|
| committer | Bl1tz23 <alex3angle@gmail.com> | 2021-08-10 13:45:24 +0300 |
| commit | 1c94b3d01340baad000188550fcf2ccab6ca80e5 (patch) | |
| tree | 12c3da17eb2d1a43fef78021a3d7c79110b0ff5f /weed/server | |
| parent | e6e57db530217ff57b3622b4672b03ebb6313e96 (diff) | |
| parent | f9cf9b93d32a2b01bc4d95ce7d24d86ef60be668 (diff) | |
| download | seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.tar.xz seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.zip | |
merge master, resolve conflicts
Diffstat (limited to 'weed/server')
31 files changed, 1114 insertions, 701 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 571944c10..2cd2276eb 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "encoding/json" "errors" "fmt" @@ -104,7 +105,9 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope } debug("parsing upload file...") - pu, pe := needle.ParseUpload(r, 256*1024*1024) + bytesBuffer := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(bytesBuffer) + pu, pe := needle.ParseUpload(r, 256*1024*1024, bytesBuffer) if pe != nil { writeJsonError(w, r, http.StatusBadRequest, pe) return @@ -277,6 +280,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) w.Header().Set("Content-Range", ra.contentRange(totalSize)) + w.WriteHeader(http.StatusPartialContent) err = writeFn(w, ra.start, ra.length) if err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 3821de6a9..08b01dd09 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -31,16 +31,7 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L } return &filer_pb.LookupDirectoryEntryResponse{ - Entry: &filer_pb.Entry{ - Name: req.Name, - IsDirectory: entry.IsDirectory(), - Attributes: filer.EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, - HardLinkId: entry.HardLinkId, - HardLinkCounter: entry.HardLinkCounter, - Content: entry.Content, - }, + Entry: entry.ToProtoEntry(), }, nil } @@ -66,16 +57,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool { hasEntries = true if err = stream.Send(&filer_pb.ListEntriesResponse{ - Entry: &filer_pb.Entry{ - Name: entry.Name(), - IsDirectory: entry.IsDirectory(), - Chunks: entry.Chunks, - Attributes: filer.EntryAttributeToPb(entry), - Extended: entry.Extended, - HardLinkId: entry.HardLinkId, - HardLinkCounter: entry.HardLinkCounter, - Content: entry.Content, - }, + Entry: entry.ToProtoEntry(), }); err != nil { return false } @@ -161,15 +143,10 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) } - createErr := fs.filer.CreateEntry(ctx, &filer.Entry{ - FullPath: util.JoinPath(req.Directory, req.Entry.Name), - Attr: filer.PbToEntryAttribute(req.Entry.Attributes), - Chunks: chunks, - Extended: req.Entry.Extended, - HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), - HardLinkCounter: req.Entry.HardLinkCounter, - Content: req.Entry.Content, - }, req.OExcl, req.IsFromOtherCluster, req.Signatures) + newEntry := filer.FromPbEntry(req.Directory, req.Entry) + newEntry.Chunks = chunks + + createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures) if createErr == nil { fs.filer.DeleteChunks(garbage) @@ -196,35 +173,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2) } - newEntry := &filer.Entry{ - FullPath: util.JoinPath(req.Directory, req.Entry.Name), - Attr: entry.Attr, - Extended: req.Entry.Extended, - Chunks: chunks, - HardLinkId: filer.HardLinkId(req.Entry.HardLinkId), - HardLinkCounter: req.Entry.HardLinkCounter, - Content: req.Entry.Content, - } - - glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v", - fullpath, entry.Attr, len(entry.Chunks), entry.Chunks, - req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks, - entry.Extended, req.Entry.Extended) - - if req.Entry.Attributes != nil { - if req.Entry.Attributes.Mtime != 0 { - newEntry.Attr.Mtime = time.Unix(req.Entry.Attributes.Mtime, 0) - } - if req.Entry.Attributes.FileMode != 0 { - newEntry.Attr.Mode = os.FileMode(req.Entry.Attributes.FileMode) - } - newEntry.Attr.Uid = req.Entry.Attributes.Uid - newEntry.Attr.Gid = req.Entry.Attributes.Gid - newEntry.Attr.Mime = req.Entry.Attributes.Mime - newEntry.Attr.UserName = req.Entry.Attributes.UserName - newEntry.Attr.GroupNames = req.Entry.Attributes.GroupName - - } + newEntry := filer.FromPbEntry(req.Directory, req.Entry) + newEntry.Chunks = chunks if filer.EqualEntry(entry, newEntry) { return &filer_pb.UpdateEntryResponse{}, err @@ -259,14 +209,14 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry garbage = append(garbage, coveredChunks...) if newEntry.Attributes != nil { - so := fs.detectStorageOption(fullpath, + so, _ := fs.detectStorageOption(fullpath, newEntry.Attributes.Collection, newEntry.Attributes.Replication, newEntry.Attributes.TtlSec, newEntry.Attributes.DiskType, "", "", - ) + ) // ignore readonly error for capacity needed to manifestize chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks) if err != nil { // not good, but should be ok @@ -307,7 +257,11 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } entry.Chunks = append(entry.Chunks, req.Chunks...) - so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "") + so, err := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "") + if err != nil { + glog.Warningf("detectStorageOption: %v", err) + return &filer_pb.AppendToEntryResponse{}, err + } entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks) if err != nil { // not good, but should be ok @@ -333,7 +287,11 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { - so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack) + so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack) + if err != nil { + glog.V(3).Infof("AssignVolume: %v", err) + return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil + } assignRequest, altRequest := so.ToAssignRequests(int(req.Count)) @@ -436,6 +394,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. Signature: fs.filer.Signature, MetricsAddress: fs.metricsAddress, MetricsIntervalSec: int32(fs.metricsIntervalSec), + Version: util.Version(), } glog.V(4).Infof("GetFilerConfiguration: %v", t) diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go new file mode 100644 index 000000000..2cbfd3319 --- /dev/null +++ b/weed/server/filer_grpc_server_remote.go @@ -0,0 +1,162 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/golang/protobuf/proto" + "strings" + "time" +) + +func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.DownloadToLocalRequest) (*filer_pb.DownloadToLocalResponse, error) { + + // load all mappings + mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE)) + if err != nil { + return nil, err + } + mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content) + if err != nil { + return nil, err + } + + // find mapping + var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation + var localMountedDir string + for k, loc := range mappings.Mappings { + if strings.HasPrefix(req.Directory, k) { + localMountedDir, remoteStorageMountedLocation = k, loc + } + } + if localMountedDir == "" { + return nil, fmt.Errorf("%s is not mounted", req.Directory) + } + + // find storage configuration + storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX)) + if err != nil { + return nil, err + } + storageConf := &filer_pb.RemoteConf{} + if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil { + return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) + } + + // find the entry + entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name)) + if err == filer_pb.ErrNotFound { + return nil, err + } + + resp := &filer_pb.DownloadToLocalResponse{} + if entry.Remote == nil || entry.Remote.RemoteSize == 0 { + return resp, nil + } + + // detect storage option + so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "") + if err != nil { + return resp, err + } + assignRequest, altRequest := so.ToAssignRequests(1) + + // find a good chunk size + chunkSize := int64(5 * 1024 * 1024) + chunkCount := entry.Remote.RemoteSize/chunkSize + 1 + for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 { + chunkSize *= 2 + chunkCount = entry.Remote.RemoteSize/chunkSize + 1 + } + + dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):]) + + var chunks []*filer_pb.FileChunk + + // FIXME limit on parallel + for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize { + size := chunkSize + if offset+chunkSize > entry.Remote.RemoteSize { + size = entry.Remote.RemoteSize - offset + } + + // assign one volume server + assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest) + if err != nil { + return resp, err + } + if assignResult.Error != "" { + return resp, fmt.Errorf("assign: %v", assignResult.Error) + } + fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid) + if assignResult.Error != "" { + return resp, fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr) + } + + // tell filer to tell volume server to download into needles + err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ + VolumeId: uint32(fileId.VolumeId), + NeedleId: uint64(fileId.Key), + Cookie: uint32(fileId.Cookie), + Offset: offset, + Size: size, + RemoteType: storageConf.Type, + RemoteName: storageConf.Name, + S3AccessKey: storageConf.S3AccessKey, + S3SecretKey: storageConf.S3SecretKey, + S3Region: storageConf.S3Region, + S3Endpoint: storageConf.S3Endpoint, + RemoteBucket: remoteStorageMountedLocation.Bucket, + RemotePath: string(dest), + }) + if fetchAndWriteErr != nil { + return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr) + } + return nil + }) + + if err != nil { + return nil, err + } + + chunks = append(chunks, &filer_pb.FileChunk{ + FileId: assignResult.Fid, + Offset: offset, + Size: uint64(size), + Mtime: time.Now().Unix(), + Fid: &filer_pb.FileId{ + VolumeId: uint32(fileId.VolumeId), + FileKey: uint64(fileId.Key), + Cookie: uint32(fileId.Cookie), + }, + }) + + } + + garbage := entry.Chunks + + newEntry := entry.ShallowClone() + newEntry.Chunks = chunks + newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry) + newEntry.Remote.LocalMtime = time.Now().Unix() + + // this skips meta data log events + + if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil { + return nil, err + } + fs.filer.DeleteChunks(garbage) + + fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil) + + resp.Entry = newEntry.ToProtoEntry() + + return resp, nil + +} diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index eadb970d5..8a11c91e3 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -33,7 +33,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) } - moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName) + moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, req.Signatures) if moveErr != nil { fs.filer.RollbackTransaction(ctx) return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) @@ -47,23 +47,23 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return &filer_pb.AtomicRenameEntryResponse{}, nil } -func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { +func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { if entry.IsDirectory() { - if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil { + if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, signatures); err != nil { return err } } return nil - }); err != nil { + }, signatures); err != nil { return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(entry.Name()), newParent.Child(newName), err) } return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -84,7 +84,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. for _, item := range entries { lastFileName = item.Name() // println("processing", lastFileName) - err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name()) + err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), signatures) if err != nil { return err } @@ -96,7 +96,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error { +func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -115,8 +115,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat Extended: entry.Extended, Content: entry.Content, } - createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil) - if createErr != nil { + if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil { return createErr } @@ -127,7 +126,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat } // delete old entry - deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil) + deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures) if deleteErr != nil { return deleteErr } diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index d9f91b125..3fdac1b26 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,7 +2,6 @@ package weed_server import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/util/log_buffer" "strings" "time" @@ -12,6 +11,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/log_buffer" +) + +const ( + // MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered + MaxUnsyncedEvents = 1e3 ) func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { @@ -25,43 +30,54 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) var processedTsNs int64 - var err error + var readPersistedLogErr error + var readInMemoryLogErr error for { - processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) + glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + + processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if readPersistedLogErr != nil { + return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) + } else { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + time.Sleep(1127 * time.Millisecond) + continue + } } - lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + + lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() return true }, eachLogEntryFn) - if err != nil { - if err == log_buffer.ResumeFromDiskError { + if readInMemoryLogErr != nil { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { continue } - glog.Errorf("processed to %v: %v", lastReadTime, err) - time.Sleep(3127 * time.Millisecond) - if err != log_buffer.ResumeError { + glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) + if readInMemoryLogErr != log_buffer.ResumeError { break } } + + time.Sleep(1127 * time.Millisecond) } - return err + return readInMemoryLogErr } @@ -76,46 +92,52 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature) + eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName) eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn) var processedTsNs int64 - var err error + var readPersistedLogErr error + var readInMemoryLogErr error for { // println("reading from persisted logs ...") - processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) - if err != nil { - return fmt.Errorf("reading from persisted logs: %v", err) + glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + if readPersistedLogErr != nil { + return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) + } else { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { + time.Sleep(1127 * time.Millisecond) + continue + } } - // glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - // println("reading from in memory logs ...") + glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() return true }, eachLogEntryFn) - if err != nil { - if err == log_buffer.ResumeFromDiskError { + if readInMemoryLogErr != nil { + if readInMemoryLogErr == log_buffer.ResumeFromDiskError { continue } - glog.Errorf("processed to %v: %v", lastReadTime, err) - time.Sleep(3127 * time.Millisecond) - if err != log_buffer.ResumeError { + glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) + time.Sleep(1127 * time.Millisecond) + if readInMemoryLogErr != log_buffer.ResumeError { break } } } - return err + return readInMemoryLogErr } @@ -135,12 +157,25 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati } } -func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { +func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + filtered := 0 + return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + defer func() { + if filtered > MaxUnsyncedEvents { + if err := stream.Send(&filer_pb.SubscribeMetadataResponse{ + EventNotification: &filer_pb.EventNotification{}, + TsNs: tsNs, + }); err == nil { + filtered = 0 + } + } + }() + filtered++ foundSelf := false for _, sig := range eventNotification.Signatures { - if sig == clientSignature && clientSignature != 0 { + if sig == req.Signature && req.Signature != 0 { return nil } if sig == fs.filer.Signature { @@ -187,6 +222,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe glog.V(0).Infof("=> client %v: %+v", clientName, err) return err } + filtered = 0 return nil } } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index dfb43c706..534bc4840 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -30,11 +30,11 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb" _ "github.com/chrislusf/seaweedfs/weed/filer/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2" - _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" @@ -149,6 +149,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs.filer.LoadFilerConf() + fs.filer.LoadRemoteStorageConfAndMapping() + grace.OnInterrupt(func() { fs.filer.Shutdown() }) diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index ed6bbb6f6..0389e1e18 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" "net/http" "strings" @@ -34,11 +35,11 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { switch r.Method { case "GET": stats.FilerRequestCounter.WithLabelValues("get").Inc() - fs.GetOrHeadHandler(w, r, true) + fs.GetOrHeadHandler(w, r) stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) case "HEAD": stats.FilerRequestCounter.WithLabelValues("head").Inc() - fs.GetOrHeadHandler(w, r, false) + fs.GetOrHeadHandler(w, r) stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) case "DELETE": stats.FilerRequestCounter.WithLabelValues("delete").Inc() @@ -53,7 +54,8 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { // wait until in flight data is less than the limit contentLength := getContentLength(r) fs.inFlightDataLimitCond.L.Lock() - for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit { + for fs.option.ConcurrentUploadLimit != 0 && atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit { + glog.V(4).Infof("wait because inflight data %d > %d", fs.inFlightDataSize, fs.option.ConcurrentUploadLimit) fs.inFlightDataLimitCond.Wait() } atomic.AddInt64(&fs.inFlightDataSize, contentLength) @@ -93,11 +95,11 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque switch r.Method { case "GET": stats.FilerRequestCounter.WithLabelValues("get").Inc() - fs.GetOrHeadHandler(w, r, true) + fs.GetOrHeadHandler(w, r) stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) case "HEAD": stats.FilerRequestCounter.WithLabelValues("head").Inc() - fs.GetOrHeadHandler(w, r, false) + fs.GetOrHeadHandler(w, r) stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) case "OPTIONS": stats.FilerRequestCounter.WithLabelValues("options").Inc() diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index ea0650ed8..fc9cacf39 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -3,6 +3,7 @@ package weed_server import ( "bytes" "context" + "fmt" "io" "mime" "net/http" @@ -21,7 +22,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) { +func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { path := r.URL.Path isForDirectory := strings.HasSuffix(path, "/") @@ -40,7 +41,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc() w.WriteHeader(http.StatusNotFound) } else { - glog.V(0).Infof("Internal %s: %v", path, err) + glog.Errorf("Internal %s: %v", path, err) stats.FilerRequestCounter.WithLabelValues("read.internalerror").Inc() w.WriteHeader(http.StatusInternalServerError) } @@ -101,7 +102,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, //Seaweed custom header are not visible to Vue or javascript seaweedHeaders := []string{} - for header, _ := range w.Header() { + for header := range w.Header() { if strings.HasPrefix(header, "Seaweed-") { seaweedHeaders = append(seaweedHeaders, header) } @@ -163,7 +164,20 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } return err } - err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) + chunks := entry.Chunks + if entry.IsInRemoteOnly() { + dir, name := entry.FullPath.DirAndName() + if resp, err := fs.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{ + Directory: dir, + Name: name, + }); err != nil { + return fmt.Errorf("cache %s: %v", entry.FullPath, err) + } else { + chunks = resp.Entry.Chunks + } + } + + err = filer.StreamContent(fs.filer.MasterClient, writer, chunks, offset, size) if err != nil { glog.Errorf("failed to stream content %s: %v", r.URL, err) } diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 95eba9d3d..8d11e664a 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,6 +2,7 @@ package weed_server import ( "context" + "errors" "net/http" "os" "strings" @@ -19,14 +20,14 @@ import ( var ( OS_UID = uint32(os.Getuid()) OS_GID = uint32(os.Getgid()) + + ErrReadOnly = errors.New("read only") ) type FilerPostResult struct { Name string `json:"name,omitempty"` Size int64 `json:"size,omitempty"` Error string `json:"error,omitempty"` - Fid string `json:"fid,omitempty"` - Url string `json:"url,omitempty"` } func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) { @@ -57,7 +58,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte ctx := context.Background() query := r.URL.Query() - so := fs.detectStorageOption0(r.RequestURI, + so, err := fs.detectStorageOption0(r.RequestURI, query.Get("collection"), query.Get("replication"), query.Get("ttl"), @@ -65,6 +66,15 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte query.Get("dataCenter"), query.Get("rack"), ) + if err != nil { + if err == ErrReadOnly { + w.WriteHeader(http.StatusInsufficientStorage) + } else { + glog.V(1).Infoln("post", r.RequestURI, ":", err.Error()) + w.WriteHeader(http.StatusInternalServerError) + } + return + } fs.autoChunk(ctx, w, r, contentLength, so) util.CloseRequest(r) @@ -105,7 +115,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption { +func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack string) (*operation.StorageOption, error) { collection := util.Nvl(qCollection, fs.option.Collection) replication := util.Nvl(qReplication, fs.option.DefaultReplication) @@ -121,6 +131,10 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication rule := fs.filer.FilerConf.MatchStorageRule(requestURI) + if rule.ReadOnly { + return nil, ErrReadOnly + } + if ttlSeconds == 0 { ttl, err := needle.ReadTTL(rule.GetTtl()) if err != nil { @@ -138,10 +152,10 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication DiskType: util.Nvl(diskType, rule.DiskType), Fsync: fsync || rule.Fsync, VolumeGrowthCount: rule.VolumeGrowthCount, - } + }, nil } -func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption { +func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) (*operation.StorageOption, error) { ttl, err := needle.ReadTTL(qTtl) if err != nil { diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index c43922ea9..a42e0fc97 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -214,11 +214,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa Size: int64(entry.FileSize), } - if entry.Extended == nil { - entry.Extended = make(map[string][]byte) - } - - SaveAmzMetaData(r, entry.Extended, false) + entry.Extended = SaveAmzMetaData(r, entry.Extended, false) for k, v := range r.Header { if len(v) > 0 && (strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires") { diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index 8334d1618..acaa8f5ab 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "context" "fmt" "net/http" @@ -30,7 +31,10 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht sizeLimit := int64(fs.option.MaxMB) * 1024 * 1024 - pu, err := needle.ParseUpload(r, sizeLimit) + bytesBuffer := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(bytesBuffer) + + pu, err := needle.ParseUpload(r, sizeLimit, bytesBuffer) uncompressedData := pu.Data if pu.IsGzipped { uncompressedData = pu.UncompressedData diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 540def563..2275ff1bc 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -7,7 +7,10 @@ import ( "io" "io/ioutil" "net/http" + "sort" "strings" + "sync" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/filer" @@ -19,85 +22,104 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) { - var fileChunks []*filer_pb.FileChunk +var bufPool = sync.Pool{ + New: func() interface{} { + return new(bytes.Buffer) + }, +} - md5Hash := md5.New() - var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) +func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { - chunkOffset := int64(0) - var smallContent []byte + md5Hash = md5.New() + var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) + var wg sync.WaitGroup + var bytesBufferCounter int64 + bytesBufferLimitCond := sync.NewCond(new(sync.Mutex)) + var fileChunksLock sync.Mutex for { + + // need to throttle used byte buffer + bytesBufferLimitCond.L.Lock() + for atomic.LoadInt64(&bytesBufferCounter) >= 4 { + glog.V(4).Infof("waiting for byte buffer %d", bytesBufferCounter) + bytesBufferLimitCond.Wait() + } + atomic.AddInt64(&bytesBufferCounter, 1) + bytesBufferLimitCond.L.Unlock() + + bytesBuffer := bufPool.Get().(*bytes.Buffer) + glog.V(4).Infof("received byte buffer %d", bytesBufferCounter) + limitedReader := io.LimitReader(partReader, int64(chunkSize)) - data, err := ioutil.ReadAll(limitedReader) - if err != nil { - return nil, nil, 0, err, nil + bytesBuffer.Reset() + + dataSize, err := bytesBuffer.ReadFrom(limitedReader) + + // data, err := ioutil.ReadAll(limitedReader) + if err != nil || dataSize == 0 { + bufPool.Put(bytesBuffer) + atomic.AddInt64(&bytesBufferCounter, -1) + bytesBufferLimitCond.Signal() + break } if chunkOffset == 0 && !isAppend(r) { - if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 { - smallContent = data - chunkOffset += int64(len(data)) + if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) { + chunkOffset += dataSize + smallContent = make([]byte, dataSize) + bytesBuffer.Read(smallContent) + bufPool.Put(bytesBuffer) + atomic.AddInt64(&bytesBufferCounter, -1) + bytesBufferLimitCond.Signal() break } } - dataReader := util.NewBytesReader(data) - - // retry to assign a different file id - var fileId, urlLocation string - var auth security.EncodedJwt - var assignErr, uploadErr error - var uploadResult *operation.UploadResult - for i := 0; i < 3; i++ { - // assign one file id for one chunk - fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so) - if assignErr != nil { - return nil, nil, 0, assignErr, nil - } - // upload the chunk to the volume server - uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth) - if uploadErr != nil { - time.Sleep(251 * time.Millisecond) - continue + wg.Add(1) + go func(offset int64) { + defer func() { + bufPool.Put(bytesBuffer) + atomic.AddInt64(&bytesBufferCounter, -1) + bytesBufferLimitCond.Signal() + wg.Done() + }() + + chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so) + if toChunkErr != nil { + uploadErr = toChunkErr } - break - } - if uploadErr != nil { - return nil, nil, 0, uploadErr, nil - } - - // if last chunk exhausted the reader exactly at the border - if uploadResult.Size == 0 { - break - } - if chunkOffset == 0 { - uploadedMd5 := util.Base64Md5ToBytes(uploadResult.ContentMd5) - readedMd5 := md5Hash.Sum(nil) - if !bytes.Equal(uploadedMd5, readedMd5) { - glog.Errorf("md5 %x does not match %x uploaded chunk %s to the volume server", readedMd5, uploadedMd5, uploadResult.Name) + if chunk != nil { + fileChunksLock.Lock() + fileChunks = append(fileChunks, chunk) + fileChunksLock.Unlock() + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size)) } - } - - // Save to chunk manifest structure - fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) - - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size)) + }(chunkOffset) // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadResult.Size) + chunkOffset = chunkOffset + dataSize // if last chunk was not at full chunk size, but already exhausted the reader - if int64(uploadResult.Size) < int64(chunkSize) { + if dataSize < int64(chunkSize) { break } } + wg.Wait() + + if uploadErr != nil { + return nil, md5Hash, 0, uploadErr, nil + } + + sort.Slice(fileChunks, func(i, j int) bool { + return fileChunks[i].Offset < fileChunks[j].Offset + }) + return fileChunks, md5Hash, chunkOffset, nil, smallContent } -func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { +func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) { stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc() start := time.Now() @@ -111,3 +133,42 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht } return uploadResult, err, data } + +func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) (*filer_pb.FileChunk, error) { + dataReader := util.NewBytesReader(data) + + // retry to assign a different file id + var fileId, urlLocation string + var auth security.EncodedJwt + var uploadErr error + var uploadResult *operation.UploadResult + for i := 0; i < 3; i++ { + // assign one file id for one chunk + fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so) + if uploadErr != nil { + glog.V(4).Infof("retry later due to assign error: %v", uploadErr) + time.Sleep(time.Duration(i+1) * 251 * time.Millisecond) + continue + } + + // upload the chunk to the volume server + uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth) + if uploadErr != nil { + glog.V(4).Infof("retry later due to upload error: %v", uploadErr) + time.Sleep(time.Duration(i+1) * 251 * time.Millisecond) + continue + } + break + } + if uploadErr != nil { + glog.Errorf("upload error: %v", uploadErr) + return nil, uploadErr + } + + // if last chunk exhausted the reader exactly at the border + if uploadResult.Size == 0 { + return nil, nil + } + + return uploadResult.ToPbFileChunk(fileId, chunkOffset), nil +} diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html new file mode 100644 index 000000000..84dc4d4d6 --- /dev/null +++ b/weed/server/filer_ui/filer.html @@ -0,0 +1,182 @@ +<!DOCTYPE html> +<html> +<head> + <title>SeaweedFS Filer</title> + <meta name="viewport" content="width=device-width, initial-scale=1"> + <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> + <style> + body { + padding-bottom: 128px; + } + + #drop-area { + border: 1px transparent; + } + + #drop-area.highlight { + border-color: purple; + border: 2px dashed #ccc; + } + + .button { + display: inline-block; + padding: 2px; + background: #ccc; + cursor: pointer; + border-radius: 2px; + border: 1px solid #ccc; + float: right; + } + + .button:hover { + background: #ddd; + } + + #fileElem { + display: none; + } + + .qrImage { + display: block; + margin-left: auto; + margin-right: auto; + } + </style> +</head> +<body> +<div class="container"> + <div class="page-header"> + <h1> + <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> + SeaweedFS Filer + </h1> + </div> + <div class="row"> + <div> + {{ range $entry := .Breadcrumbs }} + <a href="{{ printpath $entry.Link }}"> + {{ $entry.Name }} + </a> + {{ end }} + <label class="button" for="fileElem">Upload</label> + </div> + </div> + + <div class="row" id="drop-area"> + <form class="upload-form"> + <input type="file" id="fileElem" multiple onchange="handleFiles(this.files)"> + + <table width="90%"> + {{$path := .Path }} + {{ range $entry_index, $entry := .Entries }} + <tr> + <td> + {{if $entry.IsDirectory}} + <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23"> + <a href="{{ printpath $path "/" $entry.Name "/"}}" > + {{ $entry.Name }} + </a> + {{else}} + <a href="{{ printpath $path "/" $entry.Name }}" > + {{ $entry.Name }} + </a> + {{end}} + </td> + <td align="right" nowrap> + {{if $entry.IsDirectory}} + {{else}} + {{ $entry.Mime }} + {{end}} + </td> + <td align="right" nowrap> + {{if $entry.IsDirectory}} + {{else}} + {{ $entry.Size | humanizeBytes }} + {{end}} + </td> + <td nowrap> + {{ $entry.Timestamp.Format "2006-01-02 15:04" }} + </td> + </tr> + {{ end }} + + </table> + </form> + </div> + + {{if .ShouldDisplayLoadMore}} + <div class="row"> + <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName}} > + Load more + </a> + </div> + {{end}} + + <br/> + <br/> + + <div class="navbar navbar-fixed-bottom"> + <img src="data:image/png;base64,{{.QrImage}}" class="qrImage"/> + </div> + +</div> +</body> +<script type="text/javascript"> + // ************************ Drag and drop ***************** // + let dropArea = document.getElementById("drop-area") + +// Prevent default drag behaviors + ;['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => { + dropArea.addEventListener(eventName, preventDefaults, false) + document.body.addEventListener(eventName, preventDefaults, false) + }) + +// Highlight drop area when item is dragged over it + ;['dragenter', 'dragover'].forEach(eventName => { + dropArea.addEventListener(eventName, highlight, false) + }) + + ;['dragleave', 'drop'].forEach(eventName => { + dropArea.addEventListener(eventName, unhighlight, false) + }) + + // Handle dropped files + dropArea.addEventListener('drop', handleDrop, false) + + function preventDefaults(e) { + e.preventDefault() + e.stopPropagation() + } + + function highlight(e) { + dropArea.classList.add('highlight') + } + + function unhighlight(e) { + dropArea.classList.remove('highlight') + } + + function handleDrop(e) { + var dt = e.dataTransfer + var files = dt.files + + handleFiles(files) + } + + function handleFiles(files) { + files = [...files] + files.forEach(uploadFile) + window.location.reload() + } + + function uploadFile(file, i) { + var url = window.location.href + var xhr = new XMLHttpRequest() + var formData = new FormData() + xhr.open('POST', url, false) + + formData.append('file', file) + xhr.send(formData) + } +</script> +</html> diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go index 648b97f22..f9ef064bc 100644 --- a/weed/server/filer_ui/templates.go +++ b/weed/server/filer_ui/templates.go @@ -1,6 +1,7 @@ package filer_ui import ( + _ "embed" "github.com/dustin/go-humanize" "html/template" "net/url" @@ -18,178 +19,7 @@ var funcMap = template.FuncMap{ "printpath": printpath, } -var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html> -<html> -<head> - <title>SeaweedFS Filer</title> - <meta name="viewport" content="width=device-width, initial-scale=1"> - <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> -<style> -body { padding-bottom: 128px; } -#drop-area { - border: 1px transparent; -} -#drop-area.highlight { - border-color: purple; - border: 2px dashed #ccc; -} -.button { - display: inline-block; - padding: 2px; - background: #ccc; - cursor: pointer; - border-radius: 2px; - border: 1px solid #ccc; - float: right; -} -.button:hover { - background: #ddd; -} -#fileElem { - display: none; -} -.qrImage { - display: block; - margin-left: auto; - margin-right: auto; -} -</style> -</head> -<body> - <div class="container"> - <div class="page-header"> - <h1> - <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> - SeaweedFS Filer - </h1> - </div> - <div class="row"> - <div> - {{ range $entry := .Breadcrumbs }} - <a href="{{ printpath $entry.Link }}" > - {{ $entry.Name }} - </a> - {{ end }} - <label class="button" for="fileElem">Upload</label> - </div> - </div> - - <div class="row" id="drop-area"> - <form class="upload-form"> - <input type="file" id="fileElem" multiple onchange="handleFiles(this.files)"> - - <table width="90%"> - {{$path := .Path }} - {{ range $entry_index, $entry := .Entries }} - <tr> - <td> - {{if $entry.IsDirectory}} - <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23"> - <a href="{{ printpath $path "/" $entry.Name "/"}}" > - {{ $entry.Name }} - </a> - {{else}} - <a href="{{ printpath $path "/" $entry.Name }}" > - {{ $entry.Name }} - </a> - {{end}} - </td> - <td align="right" nowrap> - {{if $entry.IsDirectory}} - {{else}} - {{ $entry.Mime }} - {{end}} - </td> - <td align="right" nowrap> - {{if $entry.IsDirectory}} - {{else}} - {{ $entry.Size | humanizeBytes }} - {{end}} - </td> - <td nowrap> - {{ $entry.Timestamp.Format "2006-01-02 15:04" }} - </td> - </tr> - {{ end }} - - </table> - </form> - </div> - - {{if .ShouldDisplayLoadMore}} - <div class="row"> - <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName}} > - Load more - </a> - </div> - {{end}} - - <br/> - <br/> - - <div class="navbar navbar-fixed-bottom"> - <img src="data:image/png;base64,{{.QrImage}}" class="qrImage" /> - </div> - - </div> -</body> -<script type="text/javascript"> -// ************************ Drag and drop ***************** // -let dropArea = document.getElementById("drop-area") - -// Prevent default drag behaviors -;['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => { - dropArea.addEventListener(eventName, preventDefaults, false) - document.body.addEventListener(eventName, preventDefaults, false) -}) - -// Highlight drop area when item is dragged over it -;['dragenter', 'dragover'].forEach(eventName => { - dropArea.addEventListener(eventName, highlight, false) -}) +//go:embed filer.html +var filerHtml string -;['dragleave', 'drop'].forEach(eventName => { - dropArea.addEventListener(eventName, unhighlight, false) -}) - -// Handle dropped files -dropArea.addEventListener('drop', handleDrop, false) - -function preventDefaults (e) { - e.preventDefault() - e.stopPropagation() -} - -function highlight(e) { - dropArea.classList.add('highlight') -} - -function unhighlight(e) { - dropArea.classList.remove('highlight') -} - -function handleDrop(e) { - var dt = e.dataTransfer - var files = dt.files - - handleFiles(files) -} - -function handleFiles(files) { - files = [...files] - files.forEach(uploadFile) - window.location.reload() -} - -function uploadFile(file, i) { - var url = window.location.href - var xhr = new XMLHttpRequest() - var formData = new FormData() - xhr.open('POST', url, false) - - formData.append('file', file) - xhr.send(formData) -} -</script> -</html> -`)) +var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(filerHtml)) diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 3e6d9bb9e..50c9dbfdf 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -205,8 +205,8 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ _, err := stream.Recv() if err != nil { glog.V(2).Infof("- client %v: %v", clientName, err) - stopChan <- true - break + close(stopChan) + return } } }() diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 3a4951cc5..4132ce690 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -143,7 +143,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest maxTimeout = time.Second * 10 startTime = time.Now() ) - + for time.Now().Sub(startTime) < maxTimeout { fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option) if err == nil { diff --git a/weed/server/master_server.go b/weed/server/master_server.go index 838803908..eab41524c 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -26,8 +26,9 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls" + SequencerType = "master.sequencer.type" + SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" ) type MasterOption struct { @@ -97,7 +98,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste ms := &MasterServer{ option: option, preallocateSize: preallocateSize, - vgCh: make(chan *topology.VolumeGrowRequest, 1 << 6), + vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), clientChans: make(map[string]chan *master_pb.VolumeLocation), grpcDialOption: grpcDialOption, MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers), @@ -227,6 +228,7 @@ func (ms *MasterServer) startAdminScripts() { shellOptions.Masters = &masterAddress shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort) + shellOptions.FilerAddress = filerHostPort shellOptions.Directory = "/" if err != nil { glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err) @@ -293,7 +295,8 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer } case "snowflake": var err error - seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port)) + snowflakeId := v.GetInt(SequencerSnowflakeId) + seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port), snowflakeId) if err != nil { glog.Error(err) seq = nil diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 974b3308f..0609732c7 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -123,7 +123,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) Count: writableVolumeCount, ErrCh: errCh, } - if err := <- errCh; err != nil { + if err := <-errCh; err != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("cannot grow volume group! %v", err)) return } diff --git a/weed/server/master_ui/master.html b/weed/server/master_ui/master.html new file mode 100644 index 000000000..e0241d66d --- /dev/null +++ b/weed/server/master_ui/master.html @@ -0,0 +1,110 @@ +<!DOCTYPE html> +<html> +<head> + <title>SeaweedFS {{ .Version }}</title> + <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> +</head> +<body> +<div class="container"> + <div class="page-header"> + <h1> + <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> + SeaweedFS <small>{{ .Version }}</small> + </h1> + </div> + + <div class="row"> + <div class="col-sm-6"> + <h2>Cluster status</h2> + <table class="table table-condensed table-striped"> + <tbody> + <tr> + <th>Volume Size Limit</th> + <td>{{ .VolumeSizeLimitMB }}MB</td> + </tr> + <tr> + <th>Free</th> + <td>{{ .Topology.Free }}</td> + </tr> + <tr> + <th>Max</th> + <td>{{ .Topology.Max }}</td> + </tr> + {{ with .RaftServer }} + <tr> + <th>Leader</th> + <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td> + </tr> + <tr> + <th>Other Masters</th> + <td class="col-sm-5"> + <ul class="list-unstyled"> + {{ range $k, $p := .Peers }} + <li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li> + {{ end }} + </ul> + </td> + </tr> + {{ end }} + </tbody> + </table> + </div> + + <div class="col-sm-6"> + <h2>System Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>Concurrent Connections</th> + <td>{{ .Counters.Connections.WeekCounter.Sum }}</td> + </tr> + {{ range $key, $val := .Stats }} + <tr> + <th>{{ $key }}</th> + <td>{{ $val }}</td> + </tr> + {{ end }} + </table> + </div> + </div> + + <div class="row"> + <h2>Topology</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Data Center</th> + <th>Rack</th> + <th>RemoteAddr</th> + <th>#Volumes</th> + <th>Volume Ids</th> + <th>#ErasureCodingShards</th> + <th>Max</th> + </tr> + </thead> + <tbody> + {{ range $dc_index, $dc := .Topology.DataCenters }} + {{ range $rack_index, $rack := $dc.Racks }} + {{ range $dn_index, $dn := $rack.DataNodes }} + <tr> + <td><code>{{ $dc.Id }}</code></td> + <td>{{ $rack.Id }}</td> + <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a> + {{ if ne $dn.PublicUrl $dn.Url }} + / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a> + {{ end }} + </td> + <td>{{ $dn.Volumes }}</td> + <td>{{ $dn.VolumeIds}}</td> + <td>{{ $dn.EcShards }}</td> + <td>{{ $dn.Max }}</td> + </tr> + {{ end }} + {{ end }} + {{ end }} + </tbody> + </table> + </div> + +</div> +</body> +</html> diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index 31b6353e9..415022b97 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -1,115 +1,11 @@ package master_ui import ( + _ "embed" "html/template" ) -var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> -<html> - <head> - <title>SeaweedFS {{ .Version }}</title> - <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> - </head> - <body> - <div class="container"> - <div class="page-header"> - <h1> - <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> - SeaweedFS <small>{{ .Version }}</small> - </h1> - </div> +//go:embed master.html +var masterHtml string - <div class="row"> - <div class="col-sm-6"> - <h2>Cluster status</h2> - <table class="table table-condensed table-striped"> - <tbody> - <tr> - <th>Volume Size Limit</th> - <td>{{ .VolumeSizeLimitMB }}MB</td> - </tr> - <tr> - <th>Free</th> - <td>{{ .Topology.Free }}</td> - </tr> - <tr> - <th>Max</th> - <td>{{ .Topology.Max }}</td> - </tr> - {{ with .RaftServer }} - <tr> - <th>Leader</th> - <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td> - </tr> - <tr> - <th>Other Masters</th> - <td class="col-sm-5"><ul class="list-unstyled"> - {{ range $k, $p := .Peers }} - <li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li> - {{ end }} - </ul></td> - </tr> - {{ end }} - </tbody> - </table> - </div> - - <div class="col-sm-6"> - <h2>System Stats</h2> - <table class="table table-condensed table-striped"> - <tr> - <th>Concurrent Connections</th> - <td>{{ .Counters.Connections.WeekCounter.Sum }}</td> - </tr> - {{ range $key, $val := .Stats }} - <tr> - <th>{{ $key }}</th> - <td>{{ $val }}</td> - </tr> - {{ end }} - </table> - </div> - </div> - - <div class="row"> - <h2>Topology</h2> - <table class="table table-striped"> - <thead> - <tr> - <th>Data Center</th> - <th>Rack</th> - <th>RemoteAddr</th> - <th>#Volumes</th> - <th>Volume Ids</th> - <th>#ErasureCodingShards</th> - <th>Max</th> - </tr> - </thead> - <tbody> - {{ range $dc_index, $dc := .Topology.DataCenters }} - {{ range $rack_index, $rack := $dc.Racks }} - {{ range $dn_index, $dn := $rack.DataNodes }} - <tr> - <td><code>{{ $dc.Id }}</code></td> - <td>{{ $rack.Id }}</td> - <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a> - {{ if ne $dn.PublicUrl $dn.Url }} - / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a> - {{ end }} - </td> - <td>{{ $dn.Volumes }}</td> - <td>{{ $dn.VolumeIds}}</td> - <td>{{ $dn.EcShards }}</td> - <td>{{ $dn.Max }}</td> - </tr> - {{ end }} - {{ end }} - {{ end }} - </tbody> - </table> - </div> - - </div> - </body> -</html> -`)) +var StatusTpl = template.Must(template.New("status").Parse(masterHtml)) diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 2bc108a23..898c3da12 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -225,9 +225,9 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv if !hasEcVolume { return nil, fmt.Errorf("volume not found %d", req.VolumeId) } - count, err = vs.store.ReadEcShardNeedle(volumeId, n) + count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil) } else { - count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil) + count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil) } if err != nil { return nil, err diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index 8e84dc2a8..3645ad9c9 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -8,7 +8,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/storage/types" ) func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) { @@ -30,7 +29,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B n := new(needle.Needle) volumeId, _ := needle.NewVolumeId(vid) if req.SkipCookieCheck { - n.Id, err = types.ParseNeedleId(id_cookie) + n.Id, _, err = needle.ParseNeedleIdCookie(id_cookie) if err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, @@ -41,7 +40,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B } else { n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ FileId: fid, Status: http.StatusNotFound, diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go index 2f4fab96a..349d10097 100644 --- a/weed/server/volume_grpc_query.go +++ b/weed/server/volume_grpc_query.go @@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_ n.ParsePath(id_cookie) cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil { + if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil { glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err) return err } diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go new file mode 100644 index 000000000..fd9db2246 --- /dev/null +++ b/weed/server/volume_grpc_remote.go @@ -0,0 +1,56 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" +) + +func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) { + resp = &volume_server_pb.FetchAndWriteNeedleResponse{} + v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) + if v == nil { + return nil, fmt.Errorf("not found volume id %d", req.VolumeId) + } + + remoteConf := &filer_pb.RemoteConf{ + Type: req.RemoteType, + Name: req.RemoteName, + S3AccessKey: req.S3AccessKey, + S3SecretKey: req.S3SecretKey, + S3Region: req.S3Region, + S3Endpoint: req.S3Endpoint, + } + + client, getClientErr := remote_storage.GetRemoteStorage(remoteConf) + if getClientErr != nil { + return nil, fmt.Errorf("get remote client: %v", getClientErr) + } + + remoteStorageLocation := &filer_pb.RemoteStorageLocation{ + Name: req.RemoteName, + Bucket: req.RemoteBucket, + Path: req.RemotePath, + } + data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size) + if ReadRemoteErr != nil { + return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr) + } + + n := new(needle.Needle) + n.Id = types.NeedleId(req.NeedleId) + n.Cookie = types.Cookie(req.Cookie) + n.Data, n.DataSize = data, uint32(len(data)) + // copied from *Needle.prepareWriteBuffer() + n.Size = 4 + types.Size(n.DataSize) + 1 + n.Checksum = needle.NewCRC(n.Data) + if _, err = vs.store.WriteVolumeNeedle(v.Id, n, false); err != nil { + return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err) + } + + return resp, nil +} diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index f7359ea6b..034521b4b 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -17,6 +17,13 @@ import ( ) type VolumeServer struct { + inFlightUploadDataSize int64 + inFlightDownloadDataSize int64 + concurrentUploadLimit int64 + concurrentDownloadLimit int64 + inFlightUploadDataLimitCond *sync.Cond + inFlightDownloadDataLimitCond *sync.Cond + SeedMasterNodes []string currentMaster string pulseSeconds int @@ -28,17 +35,13 @@ type VolumeServer struct { needleMapKind storage.NeedleMapKind FixJpgOrientation bool - ReadRedirect bool + ReadMode string compactionBytePerSecond int64 metricsAddress string metricsIntervalSec int fileSizeLimitBytes int64 isHeartbeating bool stopChan chan bool - - inFlightDataSize int64 - inFlightDataLimitCond *sync.Cond - concurrentUploadLimit int64 } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, @@ -50,10 +53,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, - readRedirect bool, + readMode string, compactionMBPerSecond int, fileSizeLimitMB int, concurrentUploadLimit int64, + concurrentDownloadLimit int64, ) *VolumeServer { v := util.GetViper() @@ -67,19 +71,21 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") vs := &VolumeServer{ - pulseSeconds: pulseSeconds, - dataCenter: dataCenter, - rack: rack, - needleMapKind: needleMapKind, - FixJpgOrientation: fixJpgOrientation, - ReadRedirect: readRedirect, - grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), - compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, - fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, - isHeartbeating: true, - stopChan: make(chan bool), - inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), - concurrentUploadLimit: concurrentUploadLimit, + pulseSeconds: pulseSeconds, + dataCenter: dataCenter, + rack: rack, + needleMapKind: needleMapKind, + FixJpgOrientation: fixJpgOrientation, + ReadMode: readMode, + grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"), + compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024, + fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024, + isHeartbeating: true, + stopChan: make(chan bool), + inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)), + inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)), + concurrentUploadLimit: concurrentUploadLimit, + concurrentDownloadLimit: concurrentDownloadLimit, } vs.SeedMasterNodes = masterNodes @@ -112,6 +118,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, return vs } +func (vs *VolumeServer) SetStopping() { + glog.V(0).Infoln("Stopping volume server...") + vs.store.SetStopping() +} + func (vs *VolumeServer) Shutdown() { glog.V(0).Infoln("Shutting down volume server...") vs.store.Close() diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index 4527add44..ed7807bb8 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -37,6 +37,11 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque switch r.Method { case "GET", "HEAD": stats.ReadRequest() + vs.inFlightDownloadDataLimitCond.L.Lock() + for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit { + glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit) + vs.inFlightDownloadDataLimitCond.Wait() + } vs.GetOrHeadHandler(w, r) case "DELETE": stats.DeleteRequest() @@ -45,15 +50,16 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque // wait until in flight data is less than the limit contentLength := getContentLength(r) - vs.inFlightDataLimitCond.L.Lock() - for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit { - vs.inFlightDataLimitCond.Wait() + vs.inFlightUploadDataLimitCond.L.Lock() + for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit { + glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) + vs.inFlightUploadDataLimitCond.Wait() } - atomic.AddInt64(&vs.inFlightDataSize, contentLength) - vs.inFlightDataLimitCond.L.Unlock() + atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength) + vs.inFlightUploadDataLimitCond.L.Unlock() defer func() { - atomic.AddInt64(&vs.inFlightDataSize, -contentLength) - vs.inFlightDataLimitCond.Signal() + atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength) + vs.inFlightUploadDataLimitCond.Signal() }() // processs uploads diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 3e977cfd4..ae3c0b53f 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/types" "io" "mime" "net/http" @@ -12,6 +13,7 @@ import ( "path/filepath" "strconv" "strings" + "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -58,14 +60,53 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) hasVolume := vs.store.HasVolume(volumeId) _, hasEcVolume := vs.store.FindEcVolume(volumeId) if !hasVolume && !hasEcVolume { - if !vs.ReadRedirect { - glog.V(2).Infoln("volume is not local:", err, r.URL.Path) + if vs.ReadMode == "local" { + glog.V(0).Infoln("volume is not local:", err, r.URL.Path) w.WriteHeader(http.StatusNotFound) return } lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String()) glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err) - if err == nil && len(lookupResult.Locations) > 0 { + if err != nil || len(lookupResult.Locations) <= 0 { + glog.V(0).Infoln("lookup error:", err, r.URL.Path) + w.WriteHeader(http.StatusNotFound) + return + } + if vs.ReadMode == "proxy" { + // proxy client request to target server + u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url)) + r.URL.Host = u.Host + r.URL.Scheme = u.Scheme + request, err := http.NewRequest("GET", r.URL.String(), nil) + if err != nil { + glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err) + w.WriteHeader(http.StatusInternalServerError) + return + } + for k, vv := range r.Header { + for _, v := range vv { + request.Header.Add(k, v) + } + } + + response, err := client.Do(request) + if err != nil { + glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err) + w.WriteHeader(http.StatusInternalServerError) + return + } + defer util.CloseResponse(response) + // proxy target response to client + for k, vv := range response.Header { + for _, v := range vv { + w.Header().Add(k, v) + } + } + w.WriteHeader(response.StatusCode) + io.Copy(w, response.Body) + return + } else { + // redirect u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl)) u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid) arg := url.Values{} @@ -74,12 +115,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } u.RawQuery = arg.Encode() http.Redirect(w, r, u.String(), http.StatusMovedPermanently) - - } else { - glog.V(2).Infoln("lookup error:", err, r.URL.Path) - w.WriteHeader(http.StatusNotFound) + return } - return } cookie := n.Cookie @@ -88,11 +125,22 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } var count int + var needleSize types.Size + onReadSizeFn := func(size types.Size) { + needleSize = size + atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize)) + vs.inFlightDownloadDataLimitCond.L.Unlock() + } if hasVolume { - count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption) + count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn) } else if hasEcVolume { - count, err = vs.store.ReadEcShardNeedle(volumeId, n) + count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn) } + defer func() { + atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(needleSize)) + vs.inFlightDownloadDataLimitCond.Signal() + }() + if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume { glog.V(4).Infof("read needle: %v", err) // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request @@ -229,7 +277,7 @@ func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext strin } func shouldResizeImages(ext string, r *http.Request) (width, height int, mode string, shouldResize bool) { - if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" { + if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" || ext == ".webp" { if r.FormValue("width") != "" { width, _ = strconv.Atoi(r.FormValue("width")) } diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index 3d752eda6..aeb7d6e65 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -1,6 +1,7 @@ package weed_server import ( + "bytes" "errors" "fmt" "net/http" @@ -42,7 +43,10 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) + bytesBuffer := bufPool.Get().(*bytes.Buffer) + defer bufPool.Put(bytesBuffer) + + reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes, bytesBuffer) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return @@ -104,7 +108,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } - _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil) + _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil) if ok != nil { m := make(map[string]uint32) m["size"] = 0 diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go index 79f1a14a0..d85eb247a 100644 --- a/weed/server/volume_server_ui/templates.go +++ b/weed/server/volume_server_ui/templates.go @@ -1,6 +1,7 @@ package volume_server_ui import ( + _ "embed" "fmt" "github.com/chrislusf/seaweedfs/weed/util" "html/template" @@ -24,193 +25,10 @@ var funcMap = template.FuncMap{ "join": join, "bytesToHumanReadable": util.BytesToHumanReadable, "percentFrom": percentFrom, + "isNotEmpty": util.IsNotEmpty, } -var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html> -<html> - <head> - <title>SeaweedFS {{ .Version }}</title> - <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> - <script type="text/javascript" src="/seaweedfsstatic/javascript/jquery-3.6.0.min.js"></script> - <script type="text/javascript" src="/seaweedfsstatic/javascript/jquery-sparklines/2.1.2/jquery.sparkline.min.js"></script> - <script type="text/javascript"> - $(function() { - var periods = ['second', 'minute', 'hour', 'day']; - for (i = 0; i < periods.length; i++) { - var period = periods[i]; - $('.inlinesparkline-'+period).sparkline('html', { - type: 'line', - barColor: 'red', - tooltipSuffix:' request per '+period, - }); - } - }); - </script> - <style> - #jqstooltip{ - height: 28px !important; - width: 150px !important; - } - </style> - </head> - <body> - <div class="container"> - <div class="page-header"> - <h1> - <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> - SeaweedFS <small>{{ .Version }}</small> - </h1> - </div> - - <div class="row"> - <div class="col-sm-6"> - <h2>Disk Stats</h2> - <table class="table table-striped"> - <thead> - <tr> - <th>Path</th> - <th>Disk</th> - <th>Total</th> - <th>Free</th> - <th>Usage</th> - </tr> - </thead> - <tbody> - {{ range .DiskStatuses }} - <tr> - <td>{{ .Dir }}</td> - <td>{{ .DiskType }}</td> - <td>{{ bytesToHumanReadable .All }}</td> - <td>{{ bytesToHumanReadable .Free }}</td> - <td>{{ percentFrom .All .Used}}%</td> - </tr> - {{ end }} - </tbody> - </table> - </div> - - <div class="col-sm-6"> - <h2>System Stats</h2> - <table class="table table-condensed table-striped"> - <tr> - <th>Masters</th> - <td>{{.Masters}}</td> - </tr> - <tr> - <th>Weekly # ReadRequests</th> - <td><span class="inlinesparkline-day">{{ .Counters.ReadRequests.WeekCounter.ToList | join }}</span></td> - </tr> - <tr> - <th>Daily # ReadRequests</th> - <td><span class="inlinesparkline-hour">{{ .Counters.ReadRequests.DayCounter.ToList | join }}</span></td> - </tr> - <tr> - <th>Hourly # ReadRequests</th> - <td><span class="inlinesparkline-minute">{{ .Counters.ReadRequests.HourCounter.ToList | join }}</span></td> - </tr> - <tr> - <th>Last Minute # ReadRequests</th> - <td><span class="inlinesparkline-second">{{ .Counters.ReadRequests.MinuteCounter.ToList | join }}</span></td> - </tr> - {{ range $key, $val := .Stats }} - <tr> - <th>{{ $key }}</th> - <td>{{ $val }}</td> - </tr> - {{ end }} - </table> - </div> - </div> - - <div class="row"> - <h2>Volumes</h2> - <table class="table table-striped"> - <thead> - <tr> - <th>Id</th> - <th>Collection</th> - <th>Disk</th> - <th>Data Size</th> - <th>Files</th> - <th>Trash</th> - <th>TTL</th> - <th>ReadOnly</th> - </tr> - </thead> - <tbody> - {{ range .Volumes }} - <tr> - <td><code>{{ .Id }}</code></td> - <td>{{ .Collection }}</td> - <td>{{ .DiskType }}</td> - <td>{{ bytesToHumanReadable .Size }}</td> - <td>{{ .FileCount }}</td> - <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td> - <td>{{ .Ttl }}</td> - <td>{{ .ReadOnly }}</td> - </tr> - {{ end }} - </tbody> - </table> - </div> - - <div class="row"> - <h2>Remote Volumes</h2> - <table class="table table-striped"> - <thead> - <tr> - <th>Id</th> - <th>Collection</th> - <th>Size</th> - <th>Files</th> - <th>Trash</th> - <th>Remote</th> - <th>Key</th> - </tr> - </thead> - <tbody> - {{ range .RemoteVolumes }} - <tr> - <td><code>{{ .Id }}</code></td> - <td>{{ .Collection }}</td> - <td>{{ bytesToHumanReadable .Size }}</td> - <td>{{ .FileCount }}</td> - <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td> - <td>{{ .RemoteStorageName }}</td> - <td>{{ .RemoteStorageKey }}</td> - </tr> - {{ end }} - </tbody> - </table> - </div> - - <div class="row"> - <h2>Erasure Coding Shards</h2> - <table class="table table-striped"> - <thead> - <tr> - <th>Id</th> - <th>Collection</th> - <th>Shard Size</th> - <th>Shards</th> - <th>CreatedAt</th> - </tr> - </thead> - <tbody> - {{ range .EcVolumes }} - <tr> - <td><code>{{ .VolumeId }}</code></td> - <td>{{ .Collection }}</td> - <td>{{ bytesToHumanReadable .ShardSize }}</td> - <td>{{ .ShardIdList }}</td> - <td>{{ .CreatedAt.Format "02 Jan 06 15:04 -0700" }}</td> - </tr> - {{ end }} - </tbody> - </table> - </div> +//go:embed volume.html +var volumeHtml string - </div> - </body> -</html> -`)) +var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(volumeHtml)) diff --git a/weed/server/volume_server_ui/volume.html b/weed/server/volume_server_ui/volume.html new file mode 100644 index 000000000..91809beb0 --- /dev/null +++ b/weed/server/volume_server_ui/volume.html @@ -0,0 +1,197 @@ +<!DOCTYPE html> +<html> +<head> + <title>SeaweedFS {{ .Version }}</title> + <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> + <script type="text/javascript" src="/seaweedfsstatic/javascript/jquery-3.6.0.min.js"></script> + <script type="text/javascript" + src="/seaweedfsstatic/javascript/jquery-sparklines/2.1.2/jquery.sparkline.min.js"></script> + <script type="text/javascript"> + $(function () { + var periods = ['second', 'minute', 'hour', 'day']; + for (i = 0; i < periods.length; i++) { + var period = periods[i]; + $('.inlinesparkline-' + period).sparkline('html', { + type: 'line', + barColor: 'red', + tooltipSuffix: ' request per ' + period, + }); + } + }); + </script> + <style> + #jqstooltip { + height: 28px !important; + width: 150px !important; + } + </style> +</head> +<body> +<div class="container"> + <div class="page-header"> + <h1> + <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> + SeaweedFS <small>{{ .Version }}</small> + </h1> + </div> + + <div class="row"> + <div class="col-sm-6"> + <h2>Disk Stats</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Path</th> + <th>Disk</th> + <th>Total</th> + <th>Free</th> + <th>Usage</th> + </tr> + </thead> + <tbody> + {{ range .DiskStatuses }} + <tr> + <td>{{ .Dir }}</td> + <td>{{ .DiskType }}</td> + <td>{{ bytesToHumanReadable .All }}</td> + <td>{{ bytesToHumanReadable .Free }}</td> + <td>{{ percentFrom .All .Used}}%</td> + </tr> + {{ end }} + </tbody> + </table> + </div> + + <div class="col-sm-6"> + <h2>System Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>Masters</th> + <td>{{.Masters}}</td> + </tr> + <tr> + <th>Weekly # ReadRequests</th> + <td><span class="inlinesparkline-day">{{ .Counters.ReadRequests.WeekCounter.ToList | join }}</span> + </td> + </tr> + <tr> + <th>Daily # ReadRequests</th> + <td><span class="inlinesparkline-hour">{{ .Counters.ReadRequests.DayCounter.ToList | join }}</span> + </td> + </tr> + <tr> + <th>Hourly # ReadRequests</th> + <td><span + class="inlinesparkline-minute">{{ .Counters.ReadRequests.HourCounter.ToList | join }}</span> + </td> + </tr> + <tr> + <th>Last Minute # ReadRequests</th> + <td><span + class="inlinesparkline-second">{{ .Counters.ReadRequests.MinuteCounter.ToList | join }}</span> + </td> + </tr> + {{ range $key, $val := .Stats }} + <tr> + <th>{{ $key }}</th> + <td>{{ $val }}</td> + </tr> + {{ end }} + </table> + </div> + </div> + + <div class="row"> + <h2>Volumes</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Id</th> + <th>Collection</th> + <th>Disk</th> + <th>Data Size</th> + <th>Files</th> + <th>Trash</th> + <th>TTL</th> + <th>ReadOnly</th> + </tr> + </thead> + <tbody> + {{ range .Volumes }} + <tr> + <td><code>{{ .Id }}</code></td> + <td>{{ .Collection }}</td> + <td>{{ .DiskType }}</td> + <td>{{ bytesToHumanReadable .Size }}</td> + <td>{{ .FileCount }}</td> + <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td> + <td>{{ .Ttl }}</td> + <td>{{ .ReadOnly }}</td> + </tr> + {{ end }} + </tbody> + </table> + </div> + + {{ if isNotEmpty .RemoteVolumes }} + <div class="row"> + <h2>Remote Volumes</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Id</th> + <th>Collection</th> + <th>Size</th> + <th>Files</th> + <th>Trash</th> + <th>Remote</th> + <th>Key</th> + </tr> + </thead> + <tbody> + {{ range .RemoteVolumes }} + <tr> + <td><code>{{ .Id }}</code></td> + <td>{{ .Collection }}</td> + <td>{{ bytesToHumanReadable .Size }}</td> + <td>{{ .FileCount }}</td> + <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td> + <td>{{ .RemoteStorageName }}</td> + <td>{{ .RemoteStorageKey }}</td> + </tr> + {{ end }} + </tbody> + </table> + </div> + {{ end }} + + {{ if isNotEmpty .EcVolumes }} + <div class="row"> + <h2>Erasure Coding Shards</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Id</th> + <th>Collection</th> + <th>Shard Size</th> + <th>Shards</th> + <th>CreatedAt</th> + </tr> + </thead> + <tbody> + {{ range .EcVolumes }} + <tr> + <td><code>{{ .VolumeId }}</code></td> + <td>{{ .Collection }}</td> + <td>{{ bytesToHumanReadable .ShardSize }}</td> + <td>{{ .ShardIdList }}</td> + <td>{{ .CreatedAt.Format "02 Jan 06 15:04 -0700" }}</td> + </tr> + {{ end }} + </tbody> + </table> + </div> + {{ end }} +</div> +</body> +</html> diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index c6550a36f..68c1f3233 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -532,7 +532,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks) + f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64) f.reader = nil } if f.reader == nil { |
