aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorBl1tz23 <alex3angle@gmail.com>2021-08-10 13:45:24 +0300
committerBl1tz23 <alex3angle@gmail.com>2021-08-10 13:45:24 +0300
commit1c94b3d01340baad000188550fcf2ccab6ca80e5 (patch)
tree12c3da17eb2d1a43fef78021a3d7c79110b0ff5f /weed/server
parente6e57db530217ff57b3622b4672b03ebb6313e96 (diff)
parentf9cf9b93d32a2b01bc4d95ce7d24d86ef60be668 (diff)
downloadseaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.tar.xz
seaweedfs-1c94b3d01340baad000188550fcf2ccab6ca80e5.zip
merge master, resolve conflicts
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go6
-rw-r--r--weed/server/filer_grpc_server.go83
-rw-r--r--weed/server/filer_grpc_server_remote.go162
-rw-r--r--weed/server/filer_grpc_server_rename.go19
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go94
-rw-r--r--weed/server/filer_server.go4
-rw-r--r--weed/server/filer_server_handlers.go12
-rw-r--r--weed/server/filer_server_handlers_read.go22
-rw-r--r--weed/server/filer_server_handlers_write.go26
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go6
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go6
-rw-r--r--weed/server/filer_server_handlers_write_upload.go169
-rw-r--r--weed/server/filer_ui/filer.html182
-rw-r--r--weed/server/filer_ui/templates.go178
-rw-r--r--weed/server/master_grpc_server.go4
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/server/master_server.go11
-rw-r--r--weed/server/master_server_handlers.go2
-rw-r--r--weed/server/master_ui/master.html110
-rw-r--r--weed/server/master_ui/templates.go112
-rw-r--r--weed/server/volume_grpc_admin.go4
-rw-r--r--weed/server/volume_grpc_batch_delete.go5
-rw-r--r--weed/server/volume_grpc_query.go2
-rw-r--r--weed/server/volume_grpc_remote.go56
-rw-r--r--weed/server/volume_server.go49
-rw-r--r--weed/server/volume_server_handlers.go20
-rw-r--r--weed/server/volume_server_handlers_read.go70
-rw-r--r--weed/server/volume_server_handlers_write.go8
-rw-r--r--weed/server/volume_server_ui/templates.go192
-rw-r--r--weed/server/volume_server_ui/volume.html197
-rw-r--r--weed/server/webdav_server.go2
31 files changed, 1114 insertions, 701 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 571944c10..2cd2276eb 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "bytes"
"encoding/json"
"errors"
"fmt"
@@ -104,7 +105,9 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
}
debug("parsing upload file...")
- pu, pe := needle.ParseUpload(r, 256*1024*1024)
+ bytesBuffer := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(bytesBuffer)
+ pu, pe := needle.ParseUpload(r, 256*1024*1024, bytesBuffer)
if pe != nil {
writeJsonError(w, r, http.StatusBadRequest, pe)
return
@@ -277,6 +280,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10))
w.Header().Set("Content-Range", ra.contentRange(totalSize))
+ w.WriteHeader(http.StatusPartialContent)
err = writeFn(w, ra.start, ra.length)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 3821de6a9..08b01dd09 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -31,16 +31,7 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
}
return &filer_pb.LookupDirectoryEntryResponse{
- Entry: &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: entry.IsDirectory(),
- Attributes: filer.EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- HardLinkId: entry.HardLinkId,
- HardLinkCounter: entry.HardLinkCounter,
- Content: entry.Content,
- },
+ Entry: entry.ToProtoEntry(),
}, nil
}
@@ -66,16 +57,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName, listErr = fs.filer.StreamListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, int64(paginationLimit), req.Prefix, "", "", func(entry *filer.Entry) bool {
hasEntries = true
if err = stream.Send(&filer_pb.ListEntriesResponse{
- Entry: &filer_pb.Entry{
- Name: entry.Name(),
- IsDirectory: entry.IsDirectory(),
- Chunks: entry.Chunks,
- Attributes: filer.EntryAttributeToPb(entry),
- Extended: entry.Extended,
- HardLinkId: entry.HardLinkId,
- HardLinkCounter: entry.HardLinkCounter,
- Content: entry.Content,
- },
+ Entry: entry.ToProtoEntry(),
}); err != nil {
return false
}
@@ -161,15 +143,10 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
}
- createErr := fs.filer.CreateEntry(ctx, &filer.Entry{
- FullPath: util.JoinPath(req.Directory, req.Entry.Name),
- Attr: filer.PbToEntryAttribute(req.Entry.Attributes),
- Chunks: chunks,
- Extended: req.Entry.Extended,
- HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
- HardLinkCounter: req.Entry.HardLinkCounter,
- Content: req.Entry.Content,
- }, req.OExcl, req.IsFromOtherCluster, req.Signatures)
+ newEntry := filer.FromPbEntry(req.Directory, req.Entry)
+ newEntry.Chunks = chunks
+
+ createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures)
if createErr == nil {
fs.filer.DeleteChunks(garbage)
@@ -196,35 +173,8 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
}
- newEntry := &filer.Entry{
- FullPath: util.JoinPath(req.Directory, req.Entry.Name),
- Attr: entry.Attr,
- Extended: req.Entry.Extended,
- Chunks: chunks,
- HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
- HardLinkCounter: req.Entry.HardLinkCounter,
- Content: req.Entry.Content,
- }
-
- glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",
- fullpath, entry.Attr, len(entry.Chunks), entry.Chunks,
- req.Entry.Attributes, len(req.Entry.Chunks), req.Entry.Chunks,
- entry.Extended, req.Entry.Extended)
-
- if req.Entry.Attributes != nil {
- if req.Entry.Attributes.Mtime != 0 {
- newEntry.Attr.Mtime = time.Unix(req.Entry.Attributes.Mtime, 0)
- }
- if req.Entry.Attributes.FileMode != 0 {
- newEntry.Attr.Mode = os.FileMode(req.Entry.Attributes.FileMode)
- }
- newEntry.Attr.Uid = req.Entry.Attributes.Uid
- newEntry.Attr.Gid = req.Entry.Attributes.Gid
- newEntry.Attr.Mime = req.Entry.Attributes.Mime
- newEntry.Attr.UserName = req.Entry.Attributes.UserName
- newEntry.Attr.GroupNames = req.Entry.Attributes.GroupName
-
- }
+ newEntry := filer.FromPbEntry(req.Directory, req.Entry)
+ newEntry.Chunks = chunks
if filer.EqualEntry(entry, newEntry) {
return &filer_pb.UpdateEntryResponse{}, err
@@ -259,14 +209,14 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
garbage = append(garbage, coveredChunks...)
if newEntry.Attributes != nil {
- so := fs.detectStorageOption(fullpath,
+ so, _ := fs.detectStorageOption(fullpath,
newEntry.Attributes.Collection,
newEntry.Attributes.Replication,
newEntry.Attributes.TtlSec,
newEntry.Attributes.DiskType,
"",
"",
- )
+ ) // ignore readonly error for capacity needed to manifestize
chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), chunks)
if err != nil {
// not good, but should be ok
@@ -307,7 +257,11 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.Chunks, req.Chunks...)
- so := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "")
+ so, err := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "")
+ if err != nil {
+ glog.Warningf("detectStorageOption: %v", err)
+ return &filer_pb.AppendToEntryResponse{}, err
+ }
entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(so), entry.Chunks)
if err != nil {
// not good, but should be ok
@@ -333,7 +287,11 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
- so := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack)
+ so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack)
+ if err != nil {
+ glog.V(3).Infof("AssignVolume: %v", err)
+ return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
+ }
assignRequest, altRequest := so.ToAssignRequests(int(req.Count))
@@ -436,6 +394,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
Signature: fs.filer.Signature,
MetricsAddress: fs.metricsAddress,
MetricsIntervalSec: int32(fs.metricsIntervalSec),
+ Version: util.Version(),
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
new file mode 100644
index 000000000..2cbfd3319
--- /dev/null
+++ b/weed/server/filer_grpc_server_remote.go
@@ -0,0 +1,162 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/golang/protobuf/proto"
+ "strings"
+ "time"
+)
+
+func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.DownloadToLocalRequest) (*filer_pb.DownloadToLocalResponse, error) {
+
+ // load all mappings
+ mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
+ if err != nil {
+ return nil, err
+ }
+ mappings, err := filer.UnmarshalRemoteStorageMappings(mappingEntry.Content)
+ if err != nil {
+ return nil, err
+ }
+
+ // find mapping
+ var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
+ var localMountedDir string
+ for k, loc := range mappings.Mappings {
+ if strings.HasPrefix(req.Directory, k) {
+ localMountedDir, remoteStorageMountedLocation = k, loc
+ }
+ }
+ if localMountedDir == "" {
+ return nil, fmt.Errorf("%s is not mounted", req.Directory)
+ }
+
+ // find storage configuration
+ storageConfEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX))
+ if err != nil {
+ return nil, err
+ }
+ storageConf := &filer_pb.RemoteConf{}
+ if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
+ return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
+ }
+
+ // find the entry
+ entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name))
+ if err == filer_pb.ErrNotFound {
+ return nil, err
+ }
+
+ resp := &filer_pb.DownloadToLocalResponse{}
+ if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
+ return resp, nil
+ }
+
+ // detect storage option
+ so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "")
+ if err != nil {
+ return resp, err
+ }
+ assignRequest, altRequest := so.ToAssignRequests(1)
+
+ // find a good chunk size
+ chunkSize := int64(5 * 1024 * 1024)
+ chunkCount := entry.Remote.RemoteSize/chunkSize + 1
+ for chunkCount > 1000 && chunkSize < int64(fs.option.MaxMB)*1024*1024/2 {
+ chunkSize *= 2
+ chunkCount = entry.Remote.RemoteSize/chunkSize + 1
+ }
+
+ dest := util.FullPath(remoteStorageMountedLocation.Path).Child(string(util.FullPath(req.Directory).Child(req.Name))[len(localMountedDir):])
+
+ var chunks []*filer_pb.FileChunk
+
+ // FIXME limit on parallel
+ for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
+ size := chunkSize
+ if offset+chunkSize > entry.Remote.RemoteSize {
+ size = entry.Remote.RemoteSize - offset
+ }
+
+ // assign one volume server
+ assignResult, err := operation.Assign(fs.filer.GetMaster, fs.grpcDialOption, assignRequest, altRequest)
+ if err != nil {
+ return resp, err
+ }
+ if assignResult.Error != "" {
+ return resp, fmt.Errorf("assign: %v", assignResult.Error)
+ }
+ fileId, parseErr := needle.ParseFileIdFromString(assignResult.Fid)
+ if assignResult.Error != "" {
+ return resp, fmt.Errorf("unrecognized file id %s: %v", assignResult.Fid, parseErr)
+ }
+
+ // tell filer to tell volume server to download into needles
+ err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
+ VolumeId: uint32(fileId.VolumeId),
+ NeedleId: uint64(fileId.Key),
+ Cookie: uint32(fileId.Cookie),
+ Offset: offset,
+ Size: size,
+ RemoteType: storageConf.Type,
+ RemoteName: storageConf.Name,
+ S3AccessKey: storageConf.S3AccessKey,
+ S3SecretKey: storageConf.S3SecretKey,
+ S3Region: storageConf.S3Region,
+ S3Endpoint: storageConf.S3Endpoint,
+ RemoteBucket: remoteStorageMountedLocation.Bucket,
+ RemotePath: string(dest),
+ })
+ if fetchAndWriteErr != nil {
+ return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
+ }
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ chunks = append(chunks, &filer_pb.FileChunk{
+ FileId: assignResult.Fid,
+ Offset: offset,
+ Size: uint64(size),
+ Mtime: time.Now().Unix(),
+ Fid: &filer_pb.FileId{
+ VolumeId: uint32(fileId.VolumeId),
+ FileKey: uint64(fileId.Key),
+ Cookie: uint32(fileId.Cookie),
+ },
+ })
+
+ }
+
+ garbage := entry.Chunks
+
+ newEntry := entry.ShallowClone()
+ newEntry.Chunks = chunks
+ newEntry.Remote = proto.Clone(entry.Remote).(*filer_pb.RemoteEntry)
+ newEntry.Remote.LocalMtime = time.Now().Unix()
+
+ // this skips meta data log events
+
+ if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
+ return nil, err
+ }
+ fs.filer.DeleteChunks(garbage)
+
+ fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, false, nil)
+
+ resp.Entry = newEntry.ToProtoEntry()
+
+ return resp, nil
+
+}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index eadb970d5..8a11c91e3 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -33,7 +33,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
}
- moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName)
+ moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
if moveErr != nil {
fs.filer.RollbackTransaction(ctx)
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
@@ -47,23 +47,23 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return &filer_pb.AtomicRenameEntryResponse{}, nil
}
-func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
+func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
if entry.IsDirectory() {
- if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName); err != nil {
+ if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, signatures); err != nil {
return err
}
}
return nil
- }); err != nil {
+ }, signatures); err != nil {
return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(entry.Name()), newParent.Child(newName), err)
}
return nil
}
-func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string) error {
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
currentDirPath := oldParent.Child(entry.Name())
newDirPath := newParent.Child(newName)
@@ -84,7 +84,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
for _, item := range entries {
lastFileName = item.Name()
// println("processing", lastFileName)
- err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name())
+ err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), signatures)
if err != nil {
return err
}
@@ -96,7 +96,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return nil
}
-func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error) error {
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error {
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
@@ -115,8 +115,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
Extended: entry.Extended,
Content: entry.Content,
}
- createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil)
- if createErr != nil {
+ if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil {
return createErr
}
@@ -127,7 +126,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
}
// delete old entry
- deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil)
+ deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, signatures)
if deleteErr != nil {
return deleteErr
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index d9f91b125..3fdac1b26 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -2,7 +2,6 @@ package weed_server
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"strings"
"time"
@@ -12,6 +11,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+)
+
+const (
+ // MaxUnsyncedEvents send empty notification with timestamp when certain amount of events have been filtered
+ MaxUnsyncedEvents = 1e3
)
func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
@@ -25,43 +30,54 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
var processedTsNs int64
- var err error
+ var readPersistedLogErr error
+ var readInMemoryLogErr error
for {
- processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
+ glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if readPersistedLogErr != nil {
+ return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
+ } else {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ time.Sleep(1127 * time.Millisecond)
+ continue
+ }
}
- lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
return true
}, eachLogEntryFn)
- if err != nil {
- if err == log_buffer.ResumeFromDiskError {
+ if readInMemoryLogErr != nil {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
continue
}
- glog.Errorf("processed to %v: %v", lastReadTime, err)
- time.Sleep(3127 * time.Millisecond)
- if err != log_buffer.ResumeError {
+ glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
+ if readInMemoryLogErr != log_buffer.ResumeError {
break
}
}
+
+ time.Sleep(1127 * time.Millisecond)
}
- return err
+ return readInMemoryLogErr
}
@@ -76,46 +92,52 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
var processedTsNs int64
- var err error
+ var readPersistedLogErr error
+ var readInMemoryLogErr error
for {
// println("reading from persisted logs ...")
- processedTsNs, err = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
+ glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if readPersistedLogErr != nil {
+ return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
+ } else {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
+ time.Sleep(1127 * time.Millisecond)
+ continue
+ }
}
- // glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- // println("reading from in memory logs ...")
+ glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
return true
}, eachLogEntryFn)
- if err != nil {
- if err == log_buffer.ResumeFromDiskError {
+ if readInMemoryLogErr != nil {
+ if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
continue
}
- glog.Errorf("processed to %v: %v", lastReadTime, err)
- time.Sleep(3127 * time.Millisecond)
- if err != log_buffer.ResumeError {
+ glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
+ time.Sleep(1127 * time.Millisecond)
+ if readInMemoryLogErr != log_buffer.ResumeError {
break
}
}
}
- return err
+ return readInMemoryLogErr
}
@@ -135,12 +157,25 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati
}
}
-func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ filtered := 0
+
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ defer func() {
+ if filtered > MaxUnsyncedEvents {
+ if err := stream.Send(&filer_pb.SubscribeMetadataResponse{
+ EventNotification: &filer_pb.EventNotification{},
+ TsNs: tsNs,
+ }); err == nil {
+ filtered = 0
+ }
+ }
+ }()
+ filtered++
foundSelf := false
for _, sig := range eventNotification.Signatures {
- if sig == clientSignature && clientSignature != 0 {
+ if sig == req.Signature && req.Signature != 0 {
return nil
}
if sig == fs.filer.Signature {
@@ -187,6 +222,7 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
glog.V(0).Infof("=> client %v: %+v", clientName, err)
return err
}
+ filtered = 0
return nil
}
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index dfb43c706..534bc4840 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -30,11 +30,11 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
_ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
- _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
@@ -149,6 +149,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
fs.filer.LoadFilerConf()
+ fs.filer.LoadRemoteStorageConfAndMapping()
+
grace.OnInterrupt(func() {
fs.filer.Shutdown()
})
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index ed6bbb6f6..0389e1e18 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
@@ -34,11 +35,11 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
switch r.Method {
case "GET":
stats.FilerRequestCounter.WithLabelValues("get").Inc()
- fs.GetOrHeadHandler(w, r, true)
+ fs.GetOrHeadHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
case "HEAD":
stats.FilerRequestCounter.WithLabelValues("head").Inc()
- fs.GetOrHeadHandler(w, r, false)
+ fs.GetOrHeadHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
case "DELETE":
stats.FilerRequestCounter.WithLabelValues("delete").Inc()
@@ -53,7 +54,8 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
// wait until in flight data is less than the limit
contentLength := getContentLength(r)
fs.inFlightDataLimitCond.L.Lock()
- for atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit {
+ for fs.option.ConcurrentUploadLimit != 0 && atomic.LoadInt64(&fs.inFlightDataSize) > fs.option.ConcurrentUploadLimit {
+ glog.V(4).Infof("wait because inflight data %d > %d", fs.inFlightDataSize, fs.option.ConcurrentUploadLimit)
fs.inFlightDataLimitCond.Wait()
}
atomic.AddInt64(&fs.inFlightDataSize, contentLength)
@@ -93,11 +95,11 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque
switch r.Method {
case "GET":
stats.FilerRequestCounter.WithLabelValues("get").Inc()
- fs.GetOrHeadHandler(w, r, true)
+ fs.GetOrHeadHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds())
case "HEAD":
stats.FilerRequestCounter.WithLabelValues("head").Inc()
- fs.GetOrHeadHandler(w, r, false)
+ fs.GetOrHeadHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
case "OPTIONS":
stats.FilerRequestCounter.WithLabelValues("options").Inc()
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index ea0650ed8..fc9cacf39 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -3,6 +3,7 @@ package weed_server
import (
"bytes"
"context"
+ "fmt"
"io"
"mime"
"net/http"
@@ -21,7 +22,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, isGetMethod bool) {
+func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
isForDirectory := strings.HasSuffix(path, "/")
@@ -40,7 +41,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc()
w.WriteHeader(http.StatusNotFound)
} else {
- glog.V(0).Infof("Internal %s: %v", path, err)
+ glog.Errorf("Internal %s: %v", path, err)
stats.FilerRequestCounter.WithLabelValues("read.internalerror").Inc()
w.WriteHeader(http.StatusInternalServerError)
}
@@ -101,7 +102,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
//Seaweed custom header are not visible to Vue or javascript
seaweedHeaders := []string{}
- for header, _ := range w.Header() {
+ for header := range w.Header() {
if strings.HasPrefix(header, "Seaweed-") {
seaweedHeaders = append(seaweedHeaders, header)
}
@@ -163,7 +164,20 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
return err
}
- err = filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
+ chunks := entry.Chunks
+ if entry.IsInRemoteOnly() {
+ dir, name := entry.FullPath.DirAndName()
+ if resp, err := fs.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{
+ Directory: dir,
+ Name: name,
+ }); err != nil {
+ return fmt.Errorf("cache %s: %v", entry.FullPath, err)
+ } else {
+ chunks = resp.Entry.Chunks
+ }
+ }
+
+ err = filer.StreamContent(fs.filer.MasterClient, writer, chunks, offset, size)
if err != nil {
glog.Errorf("failed to stream content %s: %v", r.URL, err)
}
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 95eba9d3d..8d11e664a 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -2,6 +2,7 @@ package weed_server
import (
"context"
+ "errors"
"net/http"
"os"
"strings"
@@ -19,14 +20,14 @@ import (
var (
OS_UID = uint32(os.Getuid())
OS_GID = uint32(os.Getgid())
+
+ ErrReadOnly = errors.New("read only")
)
type FilerPostResult struct {
Name string `json:"name,omitempty"`
Size int64 `json:"size,omitempty"`
Error string `json:"error,omitempty"`
- Fid string `json:"fid,omitempty"`
- Url string `json:"url,omitempty"`
}
func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
@@ -57,7 +58,7 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
ctx := context.Background()
query := r.URL.Query()
- so := fs.detectStorageOption0(r.RequestURI,
+ so, err := fs.detectStorageOption0(r.RequestURI,
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
@@ -65,6 +66,15 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
query.Get("dataCenter"),
query.Get("rack"),
)
+ if err != nil {
+ if err == ErrReadOnly {
+ w.WriteHeader(http.StatusInsufficientStorage)
+ } else {
+ glog.V(1).Infoln("post", r.RequestURI, ":", err.Error())
+ w.WriteHeader(http.StatusInternalServerError)
+ }
+ return
+ }
fs.autoChunk(ctx, w, r, contentLength, so)
util.CloseRequest(r)
@@ -105,7 +115,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
-func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType string, dataCenter, rack string) *operation.StorageOption {
+func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack string) (*operation.StorageOption, error) {
collection := util.Nvl(qCollection, fs.option.Collection)
replication := util.Nvl(qReplication, fs.option.DefaultReplication)
@@ -121,6 +131,10 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
+ if rule.ReadOnly {
+ return nil, ErrReadOnly
+ }
+
if ttlSeconds == 0 {
ttl, err := needle.ReadTTL(rule.GetTtl())
if err != nil {
@@ -138,10 +152,10 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
DiskType: util.Nvl(diskType, rule.DiskType),
Fsync: fsync || rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
- }
+ }, nil
}
-func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) *operation.StorageOption {
+func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) (*operation.StorageOption, error) {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index c43922ea9..a42e0fc97 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -214,11 +214,7 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
Size: int64(entry.FileSize),
}
- if entry.Extended == nil {
- entry.Extended = make(map[string][]byte)
- }
-
- SaveAmzMetaData(r, entry.Extended, false)
+ entry.Extended = SaveAmzMetaData(r, entry.Extended, false)
for k, v := range r.Header {
if len(v) > 0 && (strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires") {
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 8334d1618..acaa8f5ab 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "bytes"
"context"
"fmt"
"net/http"
@@ -30,7 +31,10 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
sizeLimit := int64(fs.option.MaxMB) * 1024 * 1024
- pu, err := needle.ParseUpload(r, sizeLimit)
+ bytesBuffer := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(bytesBuffer)
+
+ pu, err := needle.ParseUpload(r, sizeLimit, bytesBuffer)
uncompressedData := pu.Data
if pu.IsGzipped {
uncompressedData = pu.UncompressedData
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 540def563..2275ff1bc 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -7,7 +7,10 @@ import (
"io"
"io/ioutil"
"net/http"
+ "sort"
"strings"
+ "sync"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -19,85 +22,104 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
- var fileChunks []*filer_pb.FileChunk
+var bufPool = sync.Pool{
+ New: func() interface{} {
+ return new(bytes.Buffer)
+ },
+}
- md5Hash := md5.New()
- var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
+func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
- chunkOffset := int64(0)
- var smallContent []byte
+ md5Hash = md5.New()
+ var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
+ var wg sync.WaitGroup
+ var bytesBufferCounter int64
+ bytesBufferLimitCond := sync.NewCond(new(sync.Mutex))
+ var fileChunksLock sync.Mutex
for {
+
+ // need to throttle used byte buffer
+ bytesBufferLimitCond.L.Lock()
+ for atomic.LoadInt64(&bytesBufferCounter) >= 4 {
+ glog.V(4).Infof("waiting for byte buffer %d", bytesBufferCounter)
+ bytesBufferLimitCond.Wait()
+ }
+ atomic.AddInt64(&bytesBufferCounter, 1)
+ bytesBufferLimitCond.L.Unlock()
+
+ bytesBuffer := bufPool.Get().(*bytes.Buffer)
+ glog.V(4).Infof("received byte buffer %d", bytesBufferCounter)
+
limitedReader := io.LimitReader(partReader, int64(chunkSize))
- data, err := ioutil.ReadAll(limitedReader)
- if err != nil {
- return nil, nil, 0, err, nil
+ bytesBuffer.Reset()
+
+ dataSize, err := bytesBuffer.ReadFrom(limitedReader)
+
+ // data, err := ioutil.ReadAll(limitedReader)
+ if err != nil || dataSize == 0 {
+ bufPool.Put(bytesBuffer)
+ atomic.AddInt64(&bytesBufferCounter, -1)
+ bytesBufferLimitCond.Signal()
+ break
}
if chunkOffset == 0 && !isAppend(r) {
- if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
- smallContent = data
- chunkOffset += int64(len(data))
+ if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) {
+ chunkOffset += dataSize
+ smallContent = make([]byte, dataSize)
+ bytesBuffer.Read(smallContent)
+ bufPool.Put(bytesBuffer)
+ atomic.AddInt64(&bytesBufferCounter, -1)
+ bytesBufferLimitCond.Signal()
break
}
}
- dataReader := util.NewBytesReader(data)
-
- // retry to assign a different file id
- var fileId, urlLocation string
- var auth security.EncodedJwt
- var assignErr, uploadErr error
- var uploadResult *operation.UploadResult
- for i := 0; i < 3; i++ {
- // assign one file id for one chunk
- fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
- if assignErr != nil {
- return nil, nil, 0, assignErr, nil
- }
- // upload the chunk to the volume server
- uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
- if uploadErr != nil {
- time.Sleep(251 * time.Millisecond)
- continue
+ wg.Add(1)
+ go func(offset int64) {
+ defer func() {
+ bufPool.Put(bytesBuffer)
+ atomic.AddInt64(&bytesBufferCounter, -1)
+ bytesBufferLimitCond.Signal()
+ wg.Done()
+ }()
+
+ chunk, toChunkErr := fs.dataToChunk(fileName, contentType, bytesBuffer.Bytes(), offset, so)
+ if toChunkErr != nil {
+ uploadErr = toChunkErr
}
- break
- }
- if uploadErr != nil {
- return nil, nil, 0, uploadErr, nil
- }
-
- // if last chunk exhausted the reader exactly at the border
- if uploadResult.Size == 0 {
- break
- }
- if chunkOffset == 0 {
- uploadedMd5 := util.Base64Md5ToBytes(uploadResult.ContentMd5)
- readedMd5 := md5Hash.Sum(nil)
- if !bytes.Equal(uploadedMd5, readedMd5) {
- glog.Errorf("md5 %x does not match %x uploaded chunk %s to the volume server", readedMd5, uploadedMd5, uploadResult.Name)
+ if chunk != nil {
+ fileChunksLock.Lock()
+ fileChunks = append(fileChunks, chunk)
+ fileChunksLock.Unlock()
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), chunk.FileId, offset, offset+int64(chunk.Size))
}
- }
-
- // Save to chunk manifest structure
- fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
-
- glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
+ }(chunkOffset)
// reset variables for the next chunk
- chunkOffset = chunkOffset + int64(uploadResult.Size)
+ chunkOffset = chunkOffset + dataSize
// if last chunk was not at full chunk size, but already exhausted the reader
- if int64(uploadResult.Size) < int64(chunkSize) {
+ if dataSize < int64(chunkSize) {
break
}
}
+ wg.Wait()
+
+ if uploadErr != nil {
+ return nil, md5Hash, 0, uploadErr, nil
+ }
+
+ sort.Slice(fileChunks, func(i, j int) bool {
+ return fileChunks[i].Offset < fileChunks[j].Offset
+ })
+
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
-func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
+func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
stats.FilerRequestCounter.WithLabelValues("chunkUpload").Inc()
start := time.Now()
@@ -111,3 +133,42 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
}
return uploadResult, err, data
}
+
+func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, chunkOffset int64, so *operation.StorageOption) (*filer_pb.FileChunk, error) {
+ dataReader := util.NewBytesReader(data)
+
+ // retry to assign a different file id
+ var fileId, urlLocation string
+ var auth security.EncodedJwt
+ var uploadErr error
+ var uploadResult *operation.UploadResult
+ for i := 0; i < 3; i++ {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, uploadErr = fs.assignNewFileInfo(so)
+ if uploadErr != nil {
+ glog.V(4).Infof("retry later due to assign error: %v", uploadErr)
+ time.Sleep(time.Duration(i+1) * 251 * time.Millisecond)
+ continue
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ glog.V(4).Infof("retry later due to upload error: %v", uploadErr)
+ time.Sleep(time.Duration(i+1) * 251 * time.Millisecond)
+ continue
+ }
+ break
+ }
+ if uploadErr != nil {
+ glog.Errorf("upload error: %v", uploadErr)
+ return nil, uploadErr
+ }
+
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
+ return nil, nil
+ }
+
+ return uploadResult.ToPbFileChunk(fileId, chunkOffset), nil
+}
diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html
new file mode 100644
index 000000000..84dc4d4d6
--- /dev/null
+++ b/weed/server/filer_ui/filer.html
@@ -0,0 +1,182 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>SeaweedFS Filer</title>
+ <meta name="viewport" content="width=device-width, initial-scale=1">
+ <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
+ <style>
+ body {
+ padding-bottom: 128px;
+ }
+
+ #drop-area {
+ border: 1px transparent;
+ }
+
+ #drop-area.highlight {
+ border-color: purple;
+ border: 2px dashed #ccc;
+ }
+
+ .button {
+ display: inline-block;
+ padding: 2px;
+ background: #ccc;
+ cursor: pointer;
+ border-radius: 2px;
+ border: 1px solid #ccc;
+ float: right;
+ }
+
+ .button:hover {
+ background: #ddd;
+ }
+
+ #fileElem {
+ display: none;
+ }
+
+ .qrImage {
+ display: block;
+ margin-left: auto;
+ margin-right: auto;
+ }
+ </style>
+</head>
+<body>
+<div class="container">
+ <div class="page-header">
+ <h1>
+ <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
+ SeaweedFS Filer
+ </h1>
+ </div>
+ <div class="row">
+ <div>
+ {{ range $entry := .Breadcrumbs }}
+ <a href="{{ printpath $entry.Link }}">
+ {{ $entry.Name }}
+ </a>
+ {{ end }}
+ <label class="button" for="fileElem">Upload</label>
+ </div>
+ </div>
+
+ <div class="row" id="drop-area">
+ <form class="upload-form">
+ <input type="file" id="fileElem" multiple onchange="handleFiles(this.files)">
+
+ <table width="90%">
+ {{$path := .Path }}
+ {{ range $entry_index, $entry := .Entries }}
+ <tr>
+ <td>
+ {{if $entry.IsDirectory}}
+ <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23">
+ <a href="{{ printpath $path "/" $entry.Name "/"}}" >
+ {{ $entry.Name }}
+ </a>
+ {{else}}
+ <a href="{{ printpath $path "/" $entry.Name }}" >
+ {{ $entry.Name }}
+ </a>
+ {{end}}
+ </td>
+ <td align="right" nowrap>
+ {{if $entry.IsDirectory}}
+ {{else}}
+ {{ $entry.Mime }}&nbsp;
+ {{end}}
+ </td>
+ <td align="right" nowrap>
+ {{if $entry.IsDirectory}}
+ {{else}}
+ {{ $entry.Size | humanizeBytes }}&nbsp;
+ {{end}}
+ </td>
+ <td nowrap>
+ {{ $entry.Timestamp.Format "2006-01-02 15:04" }}
+ </td>
+ </tr>
+ {{ end }}
+
+ </table>
+ </form>
+ </div>
+
+ {{if .ShouldDisplayLoadMore}}
+ <div class="row">
+ <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName}} >
+ Load more
+ </a>
+ </div>
+ {{end}}
+
+ <br/>
+ <br/>
+
+ <div class="navbar navbar-fixed-bottom">
+ <img src="data:image/png;base64,{{.QrImage}}" class="qrImage"/>
+ </div>
+
+</div>
+</body>
+<script type="text/javascript">
+ // ************************ Drag and drop ***************** //
+ let dropArea = document.getElementById("drop-area")
+
+// Prevent default drag behaviors
+ ;['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => {
+ dropArea.addEventListener(eventName, preventDefaults, false)
+ document.body.addEventListener(eventName, preventDefaults, false)
+ })
+
+// Highlight drop area when item is dragged over it
+ ;['dragenter', 'dragover'].forEach(eventName => {
+ dropArea.addEventListener(eventName, highlight, false)
+ })
+
+ ;['dragleave', 'drop'].forEach(eventName => {
+ dropArea.addEventListener(eventName, unhighlight, false)
+ })
+
+ // Handle dropped files
+ dropArea.addEventListener('drop', handleDrop, false)
+
+ function preventDefaults(e) {
+ e.preventDefault()
+ e.stopPropagation()
+ }
+
+ function highlight(e) {
+ dropArea.classList.add('highlight')
+ }
+
+ function unhighlight(e) {
+ dropArea.classList.remove('highlight')
+ }
+
+ function handleDrop(e) {
+ var dt = e.dataTransfer
+ var files = dt.files
+
+ handleFiles(files)
+ }
+
+ function handleFiles(files) {
+ files = [...files]
+ files.forEach(uploadFile)
+ window.location.reload()
+ }
+
+ function uploadFile(file, i) {
+ var url = window.location.href
+ var xhr = new XMLHttpRequest()
+ var formData = new FormData()
+ xhr.open('POST', url, false)
+
+ formData.append('file', file)
+ xhr.send(formData)
+ }
+</script>
+</html>
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index 648b97f22..f9ef064bc 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -1,6 +1,7 @@
package filer_ui
import (
+ _ "embed"
"github.com/dustin/go-humanize"
"html/template"
"net/url"
@@ -18,178 +19,7 @@ var funcMap = template.FuncMap{
"printpath": printpath,
}
-var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html>
-<html>
-<head>
- <title>SeaweedFS Filer</title>
- <meta name="viewport" content="width=device-width, initial-scale=1">
- <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
-<style>
-body { padding-bottom: 128px; }
-#drop-area {
- border: 1px transparent;
-}
-#drop-area.highlight {
- border-color: purple;
- border: 2px dashed #ccc;
-}
-.button {
- display: inline-block;
- padding: 2px;
- background: #ccc;
- cursor: pointer;
- border-radius: 2px;
- border: 1px solid #ccc;
- float: right;
-}
-.button:hover {
- background: #ddd;
-}
-#fileElem {
- display: none;
-}
-.qrImage {
- display: block;
- margin-left: auto;
- margin-right: auto;
-}
-</style>
-</head>
-<body>
- <div class="container">
- <div class="page-header">
- <h1>
- <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
- SeaweedFS Filer
- </h1>
- </div>
- <div class="row">
- <div>
- {{ range $entry := .Breadcrumbs }}
- <a href="{{ printpath $entry.Link }}" >
- {{ $entry.Name }}
- </a>
- {{ end }}
- <label class="button" for="fileElem">Upload</label>
- </div>
- </div>
-
- <div class="row" id="drop-area">
- <form class="upload-form">
- <input type="file" id="fileElem" multiple onchange="handleFiles(this.files)">
-
- <table width="90%">
- {{$path := .Path }}
- {{ range $entry_index, $entry := .Entries }}
- <tr>
- <td>
- {{if $entry.IsDirectory}}
- <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23">
- <a href="{{ printpath $path "/" $entry.Name "/"}}" >
- {{ $entry.Name }}
- </a>
- {{else}}
- <a href="{{ printpath $path "/" $entry.Name }}" >
- {{ $entry.Name }}
- </a>
- {{end}}
- </td>
- <td align="right" nowrap>
- {{if $entry.IsDirectory}}
- {{else}}
- {{ $entry.Mime }}&nbsp;
- {{end}}
- </td>
- <td align="right" nowrap>
- {{if $entry.IsDirectory}}
- {{else}}
- {{ $entry.Size | humanizeBytes }}&nbsp;
- {{end}}
- </td>
- <td nowrap>
- {{ $entry.Timestamp.Format "2006-01-02 15:04" }}
- </td>
- </tr>
- {{ end }}
-
- </table>
- </form>
- </div>
-
- {{if .ShouldDisplayLoadMore}}
- <div class="row">
- <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName}} >
- Load more
- </a>
- </div>
- {{end}}
-
- <br/>
- <br/>
-
- <div class="navbar navbar-fixed-bottom">
- <img src="data:image/png;base64,{{.QrImage}}" class="qrImage" />
- </div>
-
- </div>
-</body>
-<script type="text/javascript">
-// ************************ Drag and drop ***************** //
-let dropArea = document.getElementById("drop-area")
-
-// Prevent default drag behaviors
-;['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => {
- dropArea.addEventListener(eventName, preventDefaults, false)
- document.body.addEventListener(eventName, preventDefaults, false)
-})
-
-// Highlight drop area when item is dragged over it
-;['dragenter', 'dragover'].forEach(eventName => {
- dropArea.addEventListener(eventName, highlight, false)
-})
+//go:embed filer.html
+var filerHtml string
-;['dragleave', 'drop'].forEach(eventName => {
- dropArea.addEventListener(eventName, unhighlight, false)
-})
-
-// Handle dropped files
-dropArea.addEventListener('drop', handleDrop, false)
-
-function preventDefaults (e) {
- e.preventDefault()
- e.stopPropagation()
-}
-
-function highlight(e) {
- dropArea.classList.add('highlight')
-}
-
-function unhighlight(e) {
- dropArea.classList.remove('highlight')
-}
-
-function handleDrop(e) {
- var dt = e.dataTransfer
- var files = dt.files
-
- handleFiles(files)
-}
-
-function handleFiles(files) {
- files = [...files]
- files.forEach(uploadFile)
- window.location.reload()
-}
-
-function uploadFile(file, i) {
- var url = window.location.href
- var xhr = new XMLHttpRequest()
- var formData = new FormData()
- xhr.open('POST', url, false)
-
- formData.append('file', file)
- xhr.send(formData)
-}
-</script>
-</html>
-`))
+var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(filerHtml))
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 3e6d9bb9e..50c9dbfdf 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -205,8 +205,8 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
_, err := stream.Recv()
if err != nil {
glog.V(2).Infof("- client %v: %v", clientName, err)
- stopChan <- true
- break
+ close(stopChan)
+ return
}
}
}()
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 3a4951cc5..4132ce690 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -143,7 +143,7 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
maxTimeout = time.Second * 10
startTime = time.Now()
)
-
+
for time.Now().Sub(startTime) < maxTimeout {
fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option)
if err == nil {
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 838803908..eab41524c 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -26,8 +26,9 @@ import (
)
const (
- SequencerType = "master.sequencer.type"
- SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
+ SequencerType = "master.sequencer.type"
+ SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
+ SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
)
type MasterOption struct {
@@ -97,7 +98,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
ms := &MasterServer{
option: option,
preallocateSize: preallocateSize,
- vgCh: make(chan *topology.VolumeGrowRequest, 1 << 6),
+ vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption,
MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
@@ -227,6 +228,7 @@ func (ms *MasterServer) startAdminScripts() {
shellOptions.Masters = &masterAddress
shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort)
+ shellOptions.FilerAddress = filerHostPort
shellOptions.Directory = "/"
if err != nil {
glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err)
@@ -293,7 +295,8 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
}
case "snowflake":
var err error
- seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port))
+ snowflakeId := v.GetInt(SequencerSnowflakeId)
+ seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port), snowflakeId)
if err != nil {
glog.Error(err)
seq = nil
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 974b3308f..0609732c7 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -123,7 +123,7 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
Count: writableVolumeCount,
ErrCh: errCh,
}
- if err := <- errCh; err != nil {
+ if err := <-errCh; err != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("cannot grow volume group! %v", err))
return
}
diff --git a/weed/server/master_ui/master.html b/weed/server/master_ui/master.html
new file mode 100644
index 000000000..e0241d66d
--- /dev/null
+++ b/weed/server/master_ui/master.html
@@ -0,0 +1,110 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>SeaweedFS {{ .Version }}</title>
+ <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
+</head>
+<body>
+<div class="container">
+ <div class="page-header">
+ <h1>
+ <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
+ SeaweedFS <small>{{ .Version }}</small>
+ </h1>
+ </div>
+
+ <div class="row">
+ <div class="col-sm-6">
+ <h2>Cluster status</h2>
+ <table class="table table-condensed table-striped">
+ <tbody>
+ <tr>
+ <th>Volume Size Limit</th>
+ <td>{{ .VolumeSizeLimitMB }}MB</td>
+ </tr>
+ <tr>
+ <th>Free</th>
+ <td>{{ .Topology.Free }}</td>
+ </tr>
+ <tr>
+ <th>Max</th>
+ <td>{{ .Topology.Max }}</td>
+ </tr>
+ {{ with .RaftServer }}
+ <tr>
+ <th>Leader</th>
+ <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td>
+ </tr>
+ <tr>
+ <th>Other Masters</th>
+ <td class="col-sm-5">
+ <ul class="list-unstyled">
+ {{ range $k, $p := .Peers }}
+ <li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li>
+ {{ end }}
+ </ul>
+ </td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+ <div class="col-sm-6">
+ <h2>System Stats</h2>
+ <table class="table table-condensed table-striped">
+ <tr>
+ <th>Concurrent Connections</th>
+ <td>{{ .Counters.Connections.WeekCounter.Sum }}</td>
+ </tr>
+ {{ range $key, $val := .Stats }}
+ <tr>
+ <th>{{ $key }}</th>
+ <td>{{ $val }}</td>
+ </tr>
+ {{ end }}
+ </table>
+ </div>
+ </div>
+
+ <div class="row">
+ <h2>Topology</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Data Center</th>
+ <th>Rack</th>
+ <th>RemoteAddr</th>
+ <th>#Volumes</th>
+ <th>Volume Ids</th>
+ <th>#ErasureCodingShards</th>
+ <th>Max</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range $dc_index, $dc := .Topology.DataCenters }}
+ {{ range $rack_index, $rack := $dc.Racks }}
+ {{ range $dn_index, $dn := $rack.DataNodes }}
+ <tr>
+ <td><code>{{ $dc.Id }}</code></td>
+ <td>{{ $rack.Id }}</td>
+ <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
+ {{ if ne $dn.PublicUrl $dn.Url }}
+ / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
+ {{ end }}
+ </td>
+ <td>{{ $dn.Volumes }}</td>
+ <td>{{ $dn.VolumeIds}}</td>
+ <td>{{ $dn.EcShards }}</td>
+ <td>{{ $dn.Max }}</td>
+ </tr>
+ {{ end }}
+ {{ end }}
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+</div>
+</body>
+</html>
diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go
index 31b6353e9..415022b97 100644
--- a/weed/server/master_ui/templates.go
+++ b/weed/server/master_ui/templates.go
@@ -1,115 +1,11 @@
package master_ui
import (
+ _ "embed"
"html/template"
)
-var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
-<html>
- <head>
- <title>SeaweedFS {{ .Version }}</title>
- <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
- </head>
- <body>
- <div class="container">
- <div class="page-header">
- <h1>
- <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
- SeaweedFS <small>{{ .Version }}</small>
- </h1>
- </div>
+//go:embed master.html
+var masterHtml string
- <div class="row">
- <div class="col-sm-6">
- <h2>Cluster status</h2>
- <table class="table table-condensed table-striped">
- <tbody>
- <tr>
- <th>Volume Size Limit</th>
- <td>{{ .VolumeSizeLimitMB }}MB</td>
- </tr>
- <tr>
- <th>Free</th>
- <td>{{ .Topology.Free }}</td>
- </tr>
- <tr>
- <th>Max</th>
- <td>{{ .Topology.Max }}</td>
- </tr>
- {{ with .RaftServer }}
- <tr>
- <th>Leader</th>
- <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td>
- </tr>
- <tr>
- <th>Other Masters</th>
- <td class="col-sm-5"><ul class="list-unstyled">
- {{ range $k, $p := .Peers }}
- <li><a href="http://{{ $p.Name }}/ui/index.html">{{ $p.Name }}</a></li>
- {{ end }}
- </ul></td>
- </tr>
- {{ end }}
- </tbody>
- </table>
- </div>
-
- <div class="col-sm-6">
- <h2>System Stats</h2>
- <table class="table table-condensed table-striped">
- <tr>
- <th>Concurrent Connections</th>
- <td>{{ .Counters.Connections.WeekCounter.Sum }}</td>
- </tr>
- {{ range $key, $val := .Stats }}
- <tr>
- <th>{{ $key }}</th>
- <td>{{ $val }}</td>
- </tr>
- {{ end }}
- </table>
- </div>
- </div>
-
- <div class="row">
- <h2>Topology</h2>
- <table class="table table-striped">
- <thead>
- <tr>
- <th>Data Center</th>
- <th>Rack</th>
- <th>RemoteAddr</th>
- <th>#Volumes</th>
- <th>Volume Ids</th>
- <th>#ErasureCodingShards</th>
- <th>Max</th>
- </tr>
- </thead>
- <tbody>
- {{ range $dc_index, $dc := .Topology.DataCenters }}
- {{ range $rack_index, $rack := $dc.Racks }}
- {{ range $dn_index, $dn := $rack.DataNodes }}
- <tr>
- <td><code>{{ $dc.Id }}</code></td>
- <td>{{ $rack.Id }}</td>
- <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
- {{ if ne $dn.PublicUrl $dn.Url }}
- / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
- {{ end }}
- </td>
- <td>{{ $dn.Volumes }}</td>
- <td>{{ $dn.VolumeIds}}</td>
- <td>{{ $dn.EcShards }}</td>
- <td>{{ $dn.Max }}</td>
- </tr>
- {{ end }}
- {{ end }}
- {{ end }}
- </tbody>
- </table>
- </div>
-
- </div>
- </body>
-</html>
-`))
+var StatusTpl = template.Must(template.New("status").Parse(masterHtml))
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 2bc108a23..898c3da12 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -225,9 +225,9 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
if !hasEcVolume {
return nil, fmt.Errorf("volume not found %d", req.VolumeId)
}
- count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n, nil)
} else {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil)
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
}
if err != nil {
return nil, err
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index 8e84dc2a8..3645ad9c9 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -8,7 +8,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) {
@@ -30,7 +29,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
n := new(needle.Needle)
volumeId, _ := needle.NewVolumeId(vid)
if req.SkipCookieCheck {
- n.Id, err = types.ParseNeedleId(id_cookie)
+ n.Id, _, err = needle.ParseNeedleIdCookie(id_cookie)
if err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
@@ -41,7 +40,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
} else {
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusNotFound,
diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go
index 2f4fab96a..349d10097 100644
--- a/weed/server/volume_grpc_query.go
+++ b/weed/server/volume_grpc_query.go
@@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil); err != nil {
glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
return err
}
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
new file mode 100644
index 000000000..fd9db2246
--- /dev/null
+++ b/weed/server/volume_grpc_remote.go
@@ -0,0 +1,56 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
+)
+
+func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) {
+ resp = &volume_server_pb.FetchAndWriteNeedleResponse{}
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ remoteConf := &filer_pb.RemoteConf{
+ Type: req.RemoteType,
+ Name: req.RemoteName,
+ S3AccessKey: req.S3AccessKey,
+ S3SecretKey: req.S3SecretKey,
+ S3Region: req.S3Region,
+ S3Endpoint: req.S3Endpoint,
+ }
+
+ client, getClientErr := remote_storage.GetRemoteStorage(remoteConf)
+ if getClientErr != nil {
+ return nil, fmt.Errorf("get remote client: %v", getClientErr)
+ }
+
+ remoteStorageLocation := &filer_pb.RemoteStorageLocation{
+ Name: req.RemoteName,
+ Bucket: req.RemoteBucket,
+ Path: req.RemotePath,
+ }
+ data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size)
+ if ReadRemoteErr != nil {
+ return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)
+ }
+
+ n := new(needle.Needle)
+ n.Id = types.NeedleId(req.NeedleId)
+ n.Cookie = types.Cookie(req.Cookie)
+ n.Data, n.DataSize = data, uint32(len(data))
+ // copied from *Needle.prepareWriteBuffer()
+ n.Size = 4 + types.Size(n.DataSize) + 1
+ n.Checksum = needle.NewCRC(n.Data)
+ if _, err = vs.store.WriteVolumeNeedle(v.Id, n, false); err != nil {
+ return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index f7359ea6b..034521b4b 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -17,6 +17,13 @@ import (
)
type VolumeServer struct {
+ inFlightUploadDataSize int64
+ inFlightDownloadDataSize int64
+ concurrentUploadLimit int64
+ concurrentDownloadLimit int64
+ inFlightUploadDataLimitCond *sync.Cond
+ inFlightDownloadDataLimitCond *sync.Cond
+
SeedMasterNodes []string
currentMaster string
pulseSeconds int
@@ -28,17 +35,13 @@ type VolumeServer struct {
needleMapKind storage.NeedleMapKind
FixJpgOrientation bool
- ReadRedirect bool
+ ReadMode string
compactionBytePerSecond int64
metricsAddress string
metricsIntervalSec int
fileSizeLimitBytes int64
isHeartbeating bool
stopChan chan bool
-
- inFlightDataSize int64
- inFlightDataLimitCond *sync.Cond
- concurrentUploadLimit int64
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -50,10 +53,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
dataCenter string, rack string,
whiteList []string,
fixJpgOrientation bool,
- readRedirect bool,
+ readMode string,
compactionMBPerSecond int,
fileSizeLimitMB int,
concurrentUploadLimit int64,
+ concurrentDownloadLimit int64,
) *VolumeServer {
v := util.GetViper()
@@ -67,19 +71,21 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds")
vs := &VolumeServer{
- pulseSeconds: pulseSeconds,
- dataCenter: dataCenter,
- rack: rack,
- needleMapKind: needleMapKind,
- FixJpgOrientation: fixJpgOrientation,
- ReadRedirect: readRedirect,
- grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
- compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
- fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
- isHeartbeating: true,
- stopChan: make(chan bool),
- inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
- concurrentUploadLimit: concurrentUploadLimit,
+ pulseSeconds: pulseSeconds,
+ dataCenter: dataCenter,
+ rack: rack,
+ needleMapKind: needleMapKind,
+ FixJpgOrientation: fixJpgOrientation,
+ ReadMode: readMode,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
+ compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
+ fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
+ isHeartbeating: true,
+ stopChan: make(chan bool),
+ inFlightUploadDataLimitCond: sync.NewCond(new(sync.Mutex)),
+ inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
+ concurrentUploadLimit: concurrentUploadLimit,
+ concurrentDownloadLimit: concurrentDownloadLimit,
}
vs.SeedMasterNodes = masterNodes
@@ -112,6 +118,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
return vs
}
+func (vs *VolumeServer) SetStopping() {
+ glog.V(0).Infoln("Stopping volume server...")
+ vs.store.SetStopping()
+}
+
func (vs *VolumeServer) Shutdown() {
glog.V(0).Infoln("Shutting down volume server...")
vs.store.Close()
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 4527add44..ed7807bb8 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -37,6 +37,11 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
switch r.Method {
case "GET", "HEAD":
stats.ReadRequest()
+ vs.inFlightDownloadDataLimitCond.L.Lock()
+ for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit {
+ glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit)
+ vs.inFlightDownloadDataLimitCond.Wait()
+ }
vs.GetOrHeadHandler(w, r)
case "DELETE":
stats.DeleteRequest()
@@ -45,15 +50,16 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
// wait until in flight data is less than the limit
contentLength := getContentLength(r)
- vs.inFlightDataLimitCond.L.Lock()
- for atomic.LoadInt64(&vs.inFlightDataSize) > vs.concurrentUploadLimit {
- vs.inFlightDataLimitCond.Wait()
+ vs.inFlightUploadDataLimitCond.L.Lock()
+ for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit {
+ glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
+ vs.inFlightUploadDataLimitCond.Wait()
}
- atomic.AddInt64(&vs.inFlightDataSize, contentLength)
- vs.inFlightDataLimitCond.L.Unlock()
+ atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength)
+ vs.inFlightUploadDataLimitCond.L.Unlock()
defer func() {
- atomic.AddInt64(&vs.inFlightDataSize, -contentLength)
- vs.inFlightDataLimitCond.Signal()
+ atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength)
+ vs.inFlightUploadDataLimitCond.Signal()
}()
// processs uploads
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 3e977cfd4..ae3c0b53f 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"io"
"mime"
"net/http"
@@ -12,6 +13,7 @@ import (
"path/filepath"
"strconv"
"strings"
+ "sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -58,14 +60,53 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
hasVolume := vs.store.HasVolume(volumeId)
_, hasEcVolume := vs.store.FindEcVolume(volumeId)
if !hasVolume && !hasEcVolume {
- if !vs.ReadRedirect {
- glog.V(2).Infoln("volume is not local:", err, r.URL.Path)
+ if vs.ReadMode == "local" {
+ glog.V(0).Infoln("volume is not local:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
return
}
lookupResult, err := operation.Lookup(vs.GetMaster, volumeId.String())
glog.V(2).Infoln("volume", volumeId, "found on", lookupResult, "error", err)
- if err == nil && len(lookupResult.Locations) > 0 {
+ if err != nil || len(lookupResult.Locations) <= 0 {
+ glog.V(0).Infoln("lookup error:", err, r.URL.Path)
+ w.WriteHeader(http.StatusNotFound)
+ return
+ }
+ if vs.ReadMode == "proxy" {
+ // proxy client request to target server
+ u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].Url))
+ r.URL.Host = u.Host
+ r.URL.Scheme = u.Scheme
+ request, err := http.NewRequest("GET", r.URL.String(), nil)
+ if err != nil {
+ glog.V(0).Infof("failed to instance http request of url %s: %v", r.URL.String(), err)
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ for k, vv := range r.Header {
+ for _, v := range vv {
+ request.Header.Add(k, v)
+ }
+ }
+
+ response, err := client.Do(request)
+ if err != nil {
+ glog.V(0).Infof("request remote url %s: %v", r.URL.String(), err)
+ w.WriteHeader(http.StatusInternalServerError)
+ return
+ }
+ defer util.CloseResponse(response)
+ // proxy target response to client
+ for k, vv := range response.Header {
+ for _, v := range vv {
+ w.Header().Add(k, v)
+ }
+ }
+ w.WriteHeader(response.StatusCode)
+ io.Copy(w, response.Body)
+ return
+ } else {
+ // redirect
u, _ := url.Parse(util.NormalizeUrl(lookupResult.Locations[0].PublicUrl))
u.Path = fmt.Sprintf("%s/%s,%s", u.Path, vid, fid)
arg := url.Values{}
@@ -74,12 +115,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
u.RawQuery = arg.Encode()
http.Redirect(w, r, u.String(), http.StatusMovedPermanently)
-
- } else {
- glog.V(2).Infoln("lookup error:", err, r.URL.Path)
- w.WriteHeader(http.StatusNotFound)
+ return
}
- return
}
cookie := n.Cookie
@@ -88,11 +125,22 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
var count int
+ var needleSize types.Size
+ onReadSizeFn := func(size types.Size) {
+ needleSize = size
+ atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize))
+ vs.inFlightDownloadDataLimitCond.L.Unlock()
+ }
if hasVolume {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption)
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption, onReadSizeFn)
} else if hasEcVolume {
- count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n, onReadSizeFn)
}
+ defer func() {
+ atomic.AddInt64(&vs.inFlightDownloadDataSize, -int64(needleSize))
+ vs.inFlightDownloadDataLimitCond.Signal()
+ }()
+
if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume {
glog.V(4).Infof("read needle: %v", err)
// start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request
@@ -229,7 +277,7 @@ func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext strin
}
func shouldResizeImages(ext string, r *http.Request) (width, height int, mode string, shouldResize bool) {
- if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" {
+ if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" || ext == ".webp" {
if r.FormValue("width") != "" {
width, _ = strconv.Atoi(r.FormValue("width"))
}
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 3d752eda6..aeb7d6e65 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "bytes"
"errors"
"fmt"
"net/http"
@@ -42,7 +43,10 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
+ bytesBuffer := bufPool.Get().(*bytes.Buffer)
+ defer bufPool.Put(bytesBuffer)
+
+ reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes, bytesBuffer)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
@@ -104,7 +108,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
- _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil)
+ _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil, nil)
if ok != nil {
m := make(map[string]uint32)
m["size"] = 0
diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go
index 79f1a14a0..d85eb247a 100644
--- a/weed/server/volume_server_ui/templates.go
+++ b/weed/server/volume_server_ui/templates.go
@@ -1,6 +1,7 @@
package volume_server_ui
import (
+ _ "embed"
"fmt"
"github.com/chrislusf/seaweedfs/weed/util"
"html/template"
@@ -24,193 +25,10 @@ var funcMap = template.FuncMap{
"join": join,
"bytesToHumanReadable": util.BytesToHumanReadable,
"percentFrom": percentFrom,
+ "isNotEmpty": util.IsNotEmpty,
}
-var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html>
-<html>
- <head>
- <title>SeaweedFS {{ .Version }}</title>
- <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
- <script type="text/javascript" src="/seaweedfsstatic/javascript/jquery-3.6.0.min.js"></script>
- <script type="text/javascript" src="/seaweedfsstatic/javascript/jquery-sparklines/2.1.2/jquery.sparkline.min.js"></script>
- <script type="text/javascript">
- $(function() {
- var periods = ['second', 'minute', 'hour', 'day'];
- for (i = 0; i < periods.length; i++) {
- var period = periods[i];
- $('.inlinesparkline-'+period).sparkline('html', {
- type: 'line',
- barColor: 'red',
- tooltipSuffix:' request per '+period,
- });
- }
- });
- </script>
- <style>
- #jqstooltip{
- height: 28px !important;
- width: 150px !important;
- }
- </style>
- </head>
- <body>
- <div class="container">
- <div class="page-header">
- <h1>
- <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
- SeaweedFS <small>{{ .Version }}</small>
- </h1>
- </div>
-
- <div class="row">
- <div class="col-sm-6">
- <h2>Disk Stats</h2>
- <table class="table table-striped">
- <thead>
- <tr>
- <th>Path</th>
- <th>Disk</th>
- <th>Total</th>
- <th>Free</th>
- <th>Usage</th>
- </tr>
- </thead>
- <tbody>
- {{ range .DiskStatuses }}
- <tr>
- <td>{{ .Dir }}</td>
- <td>{{ .DiskType }}</td>
- <td>{{ bytesToHumanReadable .All }}</td>
- <td>{{ bytesToHumanReadable .Free }}</td>
- <td>{{ percentFrom .All .Used}}%</td>
- </tr>
- {{ end }}
- </tbody>
- </table>
- </div>
-
- <div class="col-sm-6">
- <h2>System Stats</h2>
- <table class="table table-condensed table-striped">
- <tr>
- <th>Masters</th>
- <td>{{.Masters}}</td>
- </tr>
- <tr>
- <th>Weekly # ReadRequests</th>
- <td><span class="inlinesparkline-day">{{ .Counters.ReadRequests.WeekCounter.ToList | join }}</span></td>
- </tr>
- <tr>
- <th>Daily # ReadRequests</th>
- <td><span class="inlinesparkline-hour">{{ .Counters.ReadRequests.DayCounter.ToList | join }}</span></td>
- </tr>
- <tr>
- <th>Hourly # ReadRequests</th>
- <td><span class="inlinesparkline-minute">{{ .Counters.ReadRequests.HourCounter.ToList | join }}</span></td>
- </tr>
- <tr>
- <th>Last Minute # ReadRequests</th>
- <td><span class="inlinesparkline-second">{{ .Counters.ReadRequests.MinuteCounter.ToList | join }}</span></td>
- </tr>
- {{ range $key, $val := .Stats }}
- <tr>
- <th>{{ $key }}</th>
- <td>{{ $val }}</td>
- </tr>
- {{ end }}
- </table>
- </div>
- </div>
-
- <div class="row">
- <h2>Volumes</h2>
- <table class="table table-striped">
- <thead>
- <tr>
- <th>Id</th>
- <th>Collection</th>
- <th>Disk</th>
- <th>Data Size</th>
- <th>Files</th>
- <th>Trash</th>
- <th>TTL</th>
- <th>ReadOnly</th>
- </tr>
- </thead>
- <tbody>
- {{ range .Volumes }}
- <tr>
- <td><code>{{ .Id }}</code></td>
- <td>{{ .Collection }}</td>
- <td>{{ .DiskType }}</td>
- <td>{{ bytesToHumanReadable .Size }}</td>
- <td>{{ .FileCount }}</td>
- <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td>
- <td>{{ .Ttl }}</td>
- <td>{{ .ReadOnly }}</td>
- </tr>
- {{ end }}
- </tbody>
- </table>
- </div>
-
- <div class="row">
- <h2>Remote Volumes</h2>
- <table class="table table-striped">
- <thead>
- <tr>
- <th>Id</th>
- <th>Collection</th>
- <th>Size</th>
- <th>Files</th>
- <th>Trash</th>
- <th>Remote</th>
- <th>Key</th>
- </tr>
- </thead>
- <tbody>
- {{ range .RemoteVolumes }}
- <tr>
- <td><code>{{ .Id }}</code></td>
- <td>{{ .Collection }}</td>
- <td>{{ bytesToHumanReadable .Size }}</td>
- <td>{{ .FileCount }}</td>
- <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td>
- <td>{{ .RemoteStorageName }}</td>
- <td>{{ .RemoteStorageKey }}</td>
- </tr>
- {{ end }}
- </tbody>
- </table>
- </div>
-
- <div class="row">
- <h2>Erasure Coding Shards</h2>
- <table class="table table-striped">
- <thead>
- <tr>
- <th>Id</th>
- <th>Collection</th>
- <th>Shard Size</th>
- <th>Shards</th>
- <th>CreatedAt</th>
- </tr>
- </thead>
- <tbody>
- {{ range .EcVolumes }}
- <tr>
- <td><code>{{ .VolumeId }}</code></td>
- <td>{{ .Collection }}</td>
- <td>{{ bytesToHumanReadable .ShardSize }}</td>
- <td>{{ .ShardIdList }}</td>
- <td>{{ .CreatedAt.Format "02 Jan 06 15:04 -0700" }}</td>
- </tr>
- {{ end }}
- </tbody>
- </table>
- </div>
+//go:embed volume.html
+var volumeHtml string
- </div>
- </body>
-</html>
-`))
+var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(volumeHtml))
diff --git a/weed/server/volume_server_ui/volume.html b/weed/server/volume_server_ui/volume.html
new file mode 100644
index 000000000..91809beb0
--- /dev/null
+++ b/weed/server/volume_server_ui/volume.html
@@ -0,0 +1,197 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>SeaweedFS {{ .Version }}</title>
+ <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
+ <script type="text/javascript" src="/seaweedfsstatic/javascript/jquery-3.6.0.min.js"></script>
+ <script type="text/javascript"
+ src="/seaweedfsstatic/javascript/jquery-sparklines/2.1.2/jquery.sparkline.min.js"></script>
+ <script type="text/javascript">
+ $(function () {
+ var periods = ['second', 'minute', 'hour', 'day'];
+ for (i = 0; i < periods.length; i++) {
+ var period = periods[i];
+ $('.inlinesparkline-' + period).sparkline('html', {
+ type: 'line',
+ barColor: 'red',
+ tooltipSuffix: ' request per ' + period,
+ });
+ }
+ });
+ </script>
+ <style>
+ #jqstooltip {
+ height: 28px !important;
+ width: 150px !important;
+ }
+ </style>
+</head>
+<body>
+<div class="container">
+ <div class="page-header">
+ <h1>
+ <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
+ SeaweedFS <small>{{ .Version }}</small>
+ </h1>
+ </div>
+
+ <div class="row">
+ <div class="col-sm-6">
+ <h2>Disk Stats</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Path</th>
+ <th>Disk</th>
+ <th>Total</th>
+ <th>Free</th>
+ <th>Usage</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range .DiskStatuses }}
+ <tr>
+ <td>{{ .Dir }}</td>
+ <td>{{ .DiskType }}</td>
+ <td>{{ bytesToHumanReadable .All }}</td>
+ <td>{{ bytesToHumanReadable .Free }}</td>
+ <td>{{ percentFrom .All .Used}}%</td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+ <div class="col-sm-6">
+ <h2>System Stats</h2>
+ <table class="table table-condensed table-striped">
+ <tr>
+ <th>Masters</th>
+ <td>{{.Masters}}</td>
+ </tr>
+ <tr>
+ <th>Weekly # ReadRequests</th>
+ <td><span class="inlinesparkline-day">{{ .Counters.ReadRequests.WeekCounter.ToList | join }}</span>
+ </td>
+ </tr>
+ <tr>
+ <th>Daily # ReadRequests</th>
+ <td><span class="inlinesparkline-hour">{{ .Counters.ReadRequests.DayCounter.ToList | join }}</span>
+ </td>
+ </tr>
+ <tr>
+ <th>Hourly # ReadRequests</th>
+ <td><span
+ class="inlinesparkline-minute">{{ .Counters.ReadRequests.HourCounter.ToList | join }}</span>
+ </td>
+ </tr>
+ <tr>
+ <th>Last Minute # ReadRequests</th>
+ <td><span
+ class="inlinesparkline-second">{{ .Counters.ReadRequests.MinuteCounter.ToList | join }}</span>
+ </td>
+ </tr>
+ {{ range $key, $val := .Stats }}
+ <tr>
+ <th>{{ $key }}</th>
+ <td>{{ $val }}</td>
+ </tr>
+ {{ end }}
+ </table>
+ </div>
+ </div>
+
+ <div class="row">
+ <h2>Volumes</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Id</th>
+ <th>Collection</th>
+ <th>Disk</th>
+ <th>Data Size</th>
+ <th>Files</th>
+ <th>Trash</th>
+ <th>TTL</th>
+ <th>ReadOnly</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range .Volumes }}
+ <tr>
+ <td><code>{{ .Id }}</code></td>
+ <td>{{ .Collection }}</td>
+ <td>{{ .DiskType }}</td>
+ <td>{{ bytesToHumanReadable .Size }}</td>
+ <td>{{ .FileCount }}</td>
+ <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td>
+ <td>{{ .Ttl }}</td>
+ <td>{{ .ReadOnly }}</td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+ {{ if isNotEmpty .RemoteVolumes }}
+ <div class="row">
+ <h2>Remote Volumes</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Id</th>
+ <th>Collection</th>
+ <th>Size</th>
+ <th>Files</th>
+ <th>Trash</th>
+ <th>Remote</th>
+ <th>Key</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range .RemoteVolumes }}
+ <tr>
+ <td><code>{{ .Id }}</code></td>
+ <td>{{ .Collection }}</td>
+ <td>{{ bytesToHumanReadable .Size }}</td>
+ <td>{{ .FileCount }}</td>
+ <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td>
+ <td>{{ .RemoteStorageName }}</td>
+ <td>{{ .RemoteStorageKey }}</td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+ {{ end }}
+
+ {{ if isNotEmpty .EcVolumes }}
+ <div class="row">
+ <h2>Erasure Coding Shards</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Id</th>
+ <th>Collection</th>
+ <th>Shard Size</th>
+ <th>Shards</th>
+ <th>CreatedAt</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range .EcVolumes }}
+ <tr>
+ <td><code>{{ .VolumeId }}</code></td>
+ <td>{{ .Collection }}</td>
+ <td>{{ bytesToHumanReadable .ShardSize }}</td>
+ <td>{{ .ShardIdList }}</td>
+ <td>{{ .CreatedAt.Format "02 Jan 06 15:04 -0700" }}</td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+ {{ end }}
+</div>
+</body>
+</html>
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index c6550a36f..68c1f3233 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -532,7 +532,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
return 0, io.EOF
}
if f.entryViewCache == nil {
- f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
+ f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64)
f.reader = nil
}
if f.reader == nil {