aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorustuzhanin <55892859+ustuzhanin@users.noreply.github.com>2020-10-02 22:47:25 +0500
committerGitHub <noreply@github.com>2020-10-02 22:47:25 +0500
commit3e0a79ef050dba9e5347d20537ef562cc4b30b62 (patch)
treee0b42e531d18136d9e272258187a305690ee2b4d /weed/server
parentcbd80253e33688f55c02dd29c994a3ee6eac3d6c (diff)
parent9ab98fa912814686b3035a97b5173c1628fbc0fc (diff)
downloadseaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.tar.xz
seaweedfs-3e0a79ef050dba9e5347d20537ef562cc4b30b62.zip
Merge pull request #1 from chrislusf/master
Merge upstream
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go113
-rw-r--r--weed/server/filer_grpc_server_kv.go42
-rw-r--r--weed/server/filer_grpc_server_rename.go21
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go84
-rw-r--r--weed/server/filer_server.go73
-rw-r--r--weed/server/filer_server_handlers.go3
-rw-r--r--weed/server/filer_server_handlers_read.go12
-rw-r--r--weed/server/filer_server_handlers_read_dir.go2
-rw-r--r--weed/server/filer_server_handlers_write.go213
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go255
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go9
-rw-r--r--weed/server/filer_ui/templates.go15
-rw-r--r--weed/server/master_grpc_server.go37
-rw-r--r--weed/server/master_grpc_server_volume.go3
-rw-r--r--weed/server/master_server.go3
-rw-r--r--weed/server/master_server_handlers_admin.go2
-rw-r--r--weed/server/volume_grpc_admin.go42
-rw-r--r--weed/server/volume_grpc_batch_delete.go4
-rw-r--r--weed/server/volume_grpc_client_to_master.go76
-rw-r--r--weed/server/volume_grpc_copy.go9
-rw-r--r--weed/server/volume_grpc_erasure_coding.go4
-rw-r--r--weed/server/volume_grpc_query.go2
-rw-r--r--weed/server/volume_server.go19
-rw-r--r--weed/server/volume_server_handlers.go3
-rw-r--r--weed/server/volume_server_handlers_admin.go2
-rw-r--r--weed/server/volume_server_handlers_read.go10
-rw-r--r--weed/server/volume_server_handlers_ui.go1
-rw-r--r--weed/server/volume_server_handlers_write.go5
-rw-r--r--weed/server/webdav_server.go44
29 files changed, 579 insertions, 529 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 48e9253f0..ecd23413f 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -6,10 +6,9 @@ import (
"os"
"path/filepath"
"strconv"
- "strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -35,9 +34,11 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
Entry: &filer_pb.Entry{
Name: req.Name,
IsDirectory: entry.IsDirectory(),
- Attributes: filer2.EntryAttributeToPb(entry),
+ Attributes: filer.EntryAttributeToPb(entry),
Chunks: entry.Chunks,
Extended: entry.Extended,
+ HardLinkId: entry.HardLinkId,
+ HardLinkCounter: entry.HardLinkCounter,
},
}, nil
}
@@ -51,7 +52,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
limit = fs.option.DirListingLimit
}
- paginationLimit := filer2.PaginationSize
+ paginationLimit := filer.PaginationSize
if limit < paginationLimit {
paginationLimit = limit
}
@@ -59,7 +60,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
+ entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix)
if err != nil {
return err
@@ -74,19 +75,15 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName = entry.Name()
- if req.Prefix != "" {
- if !strings.HasPrefix(entry.Name(), req.Prefix) {
- continue
- }
- }
-
if err := stream.Send(&filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{
Name: entry.Name(),
IsDirectory: entry.IsDirectory(),
Chunks: entry.Chunks,
- Attributes: filer2.EntryAttributeToPb(entry),
+ Attributes: filer.EntryAttributeToPb(entry),
Extended: entry.Extended,
+ HardLinkId: entry.HardLinkId,
+ HardLinkCounter: entry.HardLinkCounter,
},
}); err != nil {
return err
@@ -161,17 +158,14 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
}
- if req.Entry.Attributes == nil {
- glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name))
- resp.Error = fmt.Sprintf("can not create entry with empty attributes")
- return
- }
-
- createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{
+ createErr := fs.filer.CreateEntry(ctx, &filer.Entry{
FullPath: util.JoinPath(req.Directory, req.Entry.Name),
- Attr: filer2.PbToEntryAttribute(req.Entry.Attributes),
+ Attr: filer.PbToEntryAttribute(req.Entry.Attributes),
Chunks: chunks,
- }, req.OExcl, req.IsFromOtherCluster)
+ Extended: req.Entry.Extended,
+ HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
+ HardLinkCounter: req.Entry.HardLinkCounter,
+ }, req.OExcl, req.IsFromOtherCluster, req.Signatures)
if createErr == nil {
fs.filer.DeleteChunks(garbage)
@@ -198,11 +192,13 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
}
- newEntry := &filer2.Entry{
+ newEntry := &filer.Entry{
FullPath: util.JoinPath(req.Directory, req.Entry.Name),
Attr: entry.Attr,
Extended: req.Entry.Extended,
Chunks: chunks,
+ HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
+ HardLinkCounter: req.Entry.HardLinkCounter,
}
glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",
@@ -225,49 +221,53 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
- if filer2.EqualEntry(entry, newEntry) {
+ if filer.EqualEntry(entry, newEntry) {
return &filer_pb.UpdateEntryResponse{}, err
}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
fs.filer.DeleteChunks(garbage)
+
+ fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures)
+
} else {
glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
}
- fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster)
-
return &filer_pb.UpdateEntryResponse{}, err
}
-func (fs *FilerServer) cleanupChunks(existingEntry *filer2.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
- chunks = newEntry.Chunks
+func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
// remove old chunks if not included in the new ones
if existingEntry != nil {
- garbage, err = filer2.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks)
+ garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks)
if err != nil {
- return chunks, nil, fmt.Errorf("MinusChunks: %v", err)
+ return newEntry.Chunks, nil, fmt.Errorf("MinusChunks: %v", err)
}
}
// files with manifest chunks are usually large and append only, skip calculating covered chunks
- var coveredChunks []*filer_pb.FileChunk
- if !filer2.HasChunkManifest(newEntry.Chunks) {
- chunks, coveredChunks = filer2.CompactFileChunks(fs.lookupFileId, newEntry.Chunks)
- garbage = append(garbage, coveredChunks...)
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.Chunks)
+
+ chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks)
+ garbage = append(garbage, coveredChunks...)
+
+ if newEntry.Attributes != nil {
+ chunks, err = filer.MaybeManifestize(fs.saveAsChunk(
+ newEntry.Attributes.Replication,
+ newEntry.Attributes.Collection,
+ "",
+ needle.SecondsToTTL(newEntry.Attributes.TtlSec),
+ false), chunks)
+ if err != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", err)
+ }
}
- chunks, err = filer2.MaybeManifestize(fs.saveAsChunk(
- newEntry.Attributes.Replication,
- newEntry.Attributes.Collection,
- "",
- needle.SecondsToTTL(newEntry.Attributes.TtlSec),
- false), chunks)
- if err != nil {
- // not good, but should be ok
- glog.V(0).Infof("MaybeManifestize: %v", err)
- }
+ chunks = append(chunks, manifestChunks...)
+
return
}
@@ -279,9 +279,9 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
var offset int64 = 0
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
if err == filer_pb.ErrNotFound {
- entry = &filer2.Entry{
+ entry = &filer.Entry{
FullPath: fullpath,
- Attr: filer2.Attr{
+ Attr: filer.Attr{
Crtime: time.Now(),
Mtime: time.Now(),
Mode: os.FileMode(0644),
@@ -290,7 +290,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
},
}
} else {
- offset = int64(filer2.TotalSize(entry.Chunks))
+ offset = int64(filer.TotalSize(entry.Chunks))
}
for _, chunk := range req.Chunks {
@@ -300,7 +300,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
entry.Chunks = append(entry.Chunks, req.Chunks...)
- entry.Chunks, err = filer2.MaybeManifestize(fs.saveAsChunk(
+ entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(
entry.Replication,
entry.Collection,
"",
@@ -311,7 +311,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
glog.V(0).Infof("MaybeManifestize: %v", err)
}
- err = fs.filer.CreateEntry(context.Background(), entry, false, false)
+ err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil)
return &filer_pb.AppendToEntryResponse{}, err
}
@@ -320,7 +320,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
glog.V(4).Infof("DeleteEntry %v", req)
- err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster)
+ err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
resp = &filer_pb.DeleteEntryResponse{}
if err != nil {
resp.Error = err.Error()
@@ -426,12 +426,15 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
t := &filer_pb.GetFilerConfigurationResponse{
- Masters: fs.option.Masters,
- Collection: fs.option.Collection,
- Replication: fs.option.DefaultReplication,
- MaxMb: uint32(fs.option.MaxMB),
- DirBuckets: fs.filer.DirBucketsPath,
- Cipher: fs.filer.Cipher,
+ Masters: fs.option.Masters,
+ Collection: fs.option.Collection,
+ Replication: fs.option.DefaultReplication,
+ MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ Cipher: fs.filer.Cipher,
+ Signature: fs.filer.Signature,
+ MetricsAddress: fs.metricsAddress,
+ MetricsIntervalSec: int32(fs.metricsIntervalSec),
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
diff --git a/weed/server/filer_grpc_server_kv.go b/weed/server/filer_grpc_server_kv.go
new file mode 100644
index 000000000..3cb47115e
--- /dev/null
+++ b/weed/server/filer_grpc_server_kv.go
@@ -0,0 +1,42 @@
+package weed_server
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (fs *FilerServer) KvGet(ctx context.Context, req *filer_pb.KvGetRequest) (*filer_pb.KvGetResponse, error) {
+
+ value, err := fs.filer.Store.KvGet(ctx, req.Key)
+ if err == filer.ErrKvNotFound {
+ return &filer_pb.KvGetResponse{}, nil
+ }
+
+ if err != nil {
+ return &filer_pb.KvGetResponse{Error: err.Error()}, nil
+ }
+
+ return &filer_pb.KvGetResponse{
+ Value: value,
+ }, nil
+
+}
+
+// KvPut sets the key~value. if empty value, delete the kv entry
+func (fs *FilerServer) KvPut(ctx context.Context, req *filer_pb.KvPutRequest) (*filer_pb.KvPutResponse, error) {
+
+ if len(req.Value) == 0 {
+ if err := fs.filer.Store.KvDelete(ctx, req.Key); err != nil {
+ return &filer_pb.KvPutResponse{Error: err.Error()}, nil
+ }
+ }
+
+ err := fs.filer.Store.KvPut(ctx, req.Key, req.Value)
+ if err != nil {
+ return &filer_pb.KvPutResponse{Error: err.Error()}, nil
+ }
+
+ return &filer_pb.KvPutResponse{}, nil
+
+}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 9642fec24..f9ddeb600 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -5,7 +5,7 @@ import (
"fmt"
"path/filepath"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -43,7 +43,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return &filer_pb.AtomicRenameEntryResponse{}, nil
}
-func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error {
if entry.IsDirectory() {
@@ -59,7 +59,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e
return nil
}
-func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
currentDirPath := oldParent.Child(entry.Name())
newDirPath := newParent.Child(newName)
@@ -70,7 +70,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
includeLastFile := false
for {
- entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024)
+ entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "")
if err != nil {
return err
}
@@ -92,7 +92,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return nil
}
-func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents,
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents,
moveFolderSubEntries func() error) error {
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
@@ -105,12 +105,13 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
}
// add to new directory
- newEntry := &filer2.Entry{
+ newEntry := &filer.Entry{
FullPath: newPath,
Attr: entry.Attr,
Chunks: entry.Chunks,
+ Extended: entry.Extended,
}
- createErr := fs.filer.CreateEntry(ctx, newEntry, false, false)
+ createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil)
if createErr != nil {
return createErr
}
@@ -124,7 +125,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
}
// delete old entry
- deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false)
+ deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil)
if deleteErr != nil {
return deleteErr
}
@@ -136,6 +137,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
}
type MoveEvents struct {
- oldEntries []*filer2.Entry
- newEntries []*filer2.Entry
+ oldEntries []*filer.Entry
+ newEntries []*filer.Entry
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 4341f2091..634fb5211 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -2,12 +2,13 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
"strings"
"time"
"github.com/golang/protobuf/proto"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -24,7 +25,7 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
@@ -37,12 +38,21 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
lastReadTime = time.Unix(0, processedTsNs)
}
- err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
- fs.filer.MetaAggregator.ListenersLock.Lock()
- fs.filer.MetaAggregator.ListenersCond.Wait()
- fs.filer.MetaAggregator.ListenersLock.Unlock()
- return true
- }, eachLogEntryFn)
+ for {
+ lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.filer.MetaAggregator.ListenersLock.Lock()
+ fs.filer.MetaAggregator.ListenersCond.Wait()
+ fs.filer.MetaAggregator.ListenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+ if err != nil {
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
return err
@@ -59,30 +69,37 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- eachEventNotificationFn := eachEventNotificationFn(req, stream, clientName)
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
- if _, ok := fs.filer.Store.ActualStore.(filer2.FilerLocalStore); ok {
- // println("reading from persisted logs ...")
- processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
- if err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
- }
+ // println("reading from persisted logs ...")
+ processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
- glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
}
+ glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
// println("reading from in memory logs ...")
- err := fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
- fs.listenersLock.Lock()
- fs.listenersCond.Wait()
- fs.listenersLock.Unlock()
- return true
- }, eachLogEntryFn)
+ for {
+ lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.listenersLock.Lock()
+ fs.listenersCond.Wait()
+ fs.listenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+ if err != nil {
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
return err
@@ -104,9 +121,22 @@ func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotificati
}
}
-func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ foundSelf := false
+ for _, sig := range eventNotification.Signatures {
+ if sig == clientSignature && clientSignature != 0 {
+ return nil
+ }
+ if sig == fs.filer.Signature {
+ foundSelf = true
+ }
+ }
+ if !foundSelf {
+ eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature)
+ }
+
// get complete path to the file or directory
var entryName string
if eventNotification.OldEntry != nil {
@@ -118,7 +148,7 @@ func eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream file
fullpath := util.Join(dirPath, entryName)
// skip on filer internal meta logs
- if strings.HasPrefix(fullpath, filer2.SystemLogDir) {
+ if strings.HasPrefix(fullpath, filer.SystemLogDir) {
return nil
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 6995c7cfe..59c149cef 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
"os"
"sync"
@@ -15,19 +16,19 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
@@ -58,9 +59,13 @@ type FilerOption struct {
type FilerServer struct {
option *FilerOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
+ // metrics read from the master
+ metricsAddress string
+ metricsIntervalSec int
+
// notifying clients
listenersLock sync.Mutex
listenersCond *sync.Cond
@@ -82,13 +87,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
- maybeStartMetrics(fs, option)
+ fs.checkWithMaster()
+ go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepConnectedToMaster()
v := util.GetViper()
@@ -130,9 +136,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
return fs, nil
}
-func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
+func (fs *FilerServer) checkWithMaster() {
- for _, master := range option.Masters {
+ for _, master := range fs.option.Masters {
_, err := pb.ParseFilerGrpcAddress(master)
if err != nil {
glog.Fatalf("invalid master address %s: %v", master, err)
@@ -140,12 +146,19 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
}
isConnected := false
- var metricsAddress string
- var metricsIntervalSec int
- var readErr error
for !isConnected {
- for _, master := range option.Masters {
- metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
+ for _, master := range fs.option.Masters {
+ readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ if fs.option.DefaultReplication == "" {
+ fs.option.DefaultReplication = resp.DefaultReplication
+ }
+ return nil
+ })
if readErr == nil {
isConnected = true
} else {
@@ -153,23 +166,5 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
}
}
}
- if metricsAddress == "" && metricsIntervalSec <= 0 {
- return
- }
- go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
- func() (addr string, intervalSeconds int) {
- return metricsAddress, metricsIntervalSec
- })
-}
-func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
- err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
- resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
- }
- metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
- return nil
- })
- return
}
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index b6bfc3b04..18f78881c 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"time"
@@ -8,6 +9,7 @@ import (
)
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
start := time.Now()
switch r.Method {
case "GET":
@@ -34,6 +36,7 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
}
func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
start := time.Now()
switch r.Method {
case "GET":
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 657158c2f..fbd45d6b9 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -11,7 +11,7 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -94,7 +94,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
// set etag
- etag := filer2.ETagEntry(entry)
+ etag := filer.ETagEntry(entry)
if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" {
w.WriteHeader(http.StatusNotModified)
return
@@ -105,17 +105,17 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
adjustHeaderContentDisposition(w, r, filename)
if r.Method == "HEAD" {
- w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
+ w.Header().Set("Content-Length", strconv.FormatInt(int64(entry.Size()), 10))
return
}
- totalSize := int64(filer2.TotalSize(entry.Chunks))
+ totalSize := int64(entry.Size())
if rangeReq := r.Header.Get("Range"); rangeReq == "" {
ext := filepath.Ext(filename)
width, height, mode, shouldResize := shouldResizeImages(ext, r)
if shouldResize {
- data, err := filer2.ReadAll(fs.filer.MasterClient, entry.Chunks)
+ data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks)
if err != nil {
glog.Errorf("failed to read %s: %v", path, err)
w.WriteHeader(http.StatusNotModified)
@@ -128,7 +128,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
- return filer2.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
+ return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
})
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index ae28fc1db..9ca0209f4 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -32,7 +32,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
- entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit)
+ entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, "")
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index da66178ce..0091ae3ce 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -2,22 +2,11 @@ package weed_server
import (
"context"
- "crypto/md5"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "mime"
"net/http"
- "net/url"
"os"
- filenamePath "path"
- "strconv"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -98,206 +87,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ttlSeconds = int32(ttl.Minutes()) * 60
}
- if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked {
- return
- }
+ fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
- if fs.option.Cipher {
- reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
- if err != nil {
- writeJsonError(w, r, http.StatusInternalServerError, err)
- } else if reply != nil {
- writeJsonQuiet(w, r, http.StatusCreated, reply)
- }
-
- return
- }
-
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
-
- if err != nil || fileId == "" || urlLocation == "" {
- glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
- writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter))
- return
- }
-
- glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
-
- u, _ := url.Parse(urlLocation)
- ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
- if err != nil {
- return
- }
-
- if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil {
- return
- }
-
- // send back post result
- reply := FilerPostResult{
- Name: ret.Name,
- Size: int64(ret.Size),
- Error: ret.Error,
- Fid: fileId,
- Url: urlLocation,
- }
- setEtag(w, ret.ETag)
- writeJsonQuiet(w, r, http.StatusCreated, reply)
-}
-
-// update metadata in filer store
-func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string,
- collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) {
-
- stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
- }()
-
- modeStr := r.URL.Query().Get("mode")
- if modeStr == "" {
- modeStr = "0660"
- }
- mode, err := strconv.ParseUint(modeStr, 8, 32)
- if err != nil {
- glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
- mode = 0660
- }
-
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if ret.Name != "" {
- path += ret.Name
- }
- }
- existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
- crTime := time.Now()
- if err == nil && existingEntry != nil {
- crTime = existingEntry.Crtime
- }
- entry := &filer2.Entry{
- FullPath: util.FullPath(path),
- Attr: filer2.Attr{
- Mtime: time.Now(),
- Crtime: crTime,
- Mode: os.FileMode(mode),
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: replication,
- Collection: collection,
- TtlSec: ttlSeconds,
- Mime: ret.Mime,
- Md5: md5value,
- },
- Chunks: []*filer_pb.FileChunk{{
- FileId: fileId,
- Size: uint64(ret.Size),
- Mtime: time.Now().UnixNano(),
- ETag: ret.ETag,
- }},
- }
- if entry.Attr.Mime == "" {
- if ext := filenamePath.Ext(path); ext != "" {
- entry.Attr.Mime = mime.TypeByExtension(ext)
- }
- }
- // glog.V(4).Infof("saving %s => %+v", path, entry)
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil {
- fs.filer.DeleteChunks(entry.Chunks)
- glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
- writeJsonError(w, r, http.StatusInternalServerError, dbErr)
- err = dbErr
- return
- }
-
- return nil
-}
-
-// send request to volume server
-func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) {
-
- stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
- start := time.Now()
- defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
-
- ret = &operation.UploadResult{}
-
- md5Hash := md5.New()
- body := r.Body
- if r.Method == "PUT" {
- // only PUT or large chunked files has Md5 in attributes
- body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash))
- }
-
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
-
- if auth != "" {
- request.Header.Set("Authorization", "BEARER "+string(auth))
- }
- resp, doErr := util.Do(request)
- if doErr != nil {
- glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
- writeJsonError(w, r, http.StatusInternalServerError, doErr)
- err = doErr
- return
- }
- defer func() {
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
- }()
-
- respBody, raErr := ioutil.ReadAll(resp.Body)
- if raErr != nil {
- glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
- writeJsonError(w, r, http.StatusInternalServerError, raErr)
- err = raErr
- return
- }
-
- glog.V(4).Infoln("post result", string(respBody))
- unmarshalErr := json.Unmarshal(respBody, &ret)
- if unmarshalErr != nil {
- glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
- writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
- err = unmarshalErr
- return
- }
- if ret.Error != "" {
- err = errors.New(ret.Error)
- glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
- }
- // find correct final path
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if ret.Name != "" {
- path += ret.Name
- } else {
- err = fmt.Errorf("can not to write to folder %s without a file name", path)
- fs.filer.DeleteFileByFileId(fileId)
- glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
- }
- }
- // use filer calculated md5 ETag, instead of the volume server crc ETag
- if r.Method == "PUT" {
- md5value = md5Hash.Sum(nil)
- }
- ret.ETag = getEtag(resp)
- return
}
// curl -X DELETE http://localhost:8888/path/to
@@ -320,7 +111,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
objectPath = objectPath[0 : len(objectPath)-1]
}
- err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false)
+ err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
if err != nil {
glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
httpStatus := http.StatusInternalServerError
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index be0438efb..2b37e3c5d 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -3,15 +3,18 @@ package weed_server
import (
"context"
"crypto/md5"
+ "fmt"
+ "hash"
"io"
"io/ioutil"
"net/http"
+ "os"
"path"
"strconv"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -21,11 +24,7 @@ import (
)
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool {
- if r.Method != "POST" {
- glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
- return false
- }
+ replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) {
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r.URL.Query()
@@ -35,54 +34,47 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
if maxMB <= 0 && fs.option.MaxMB > 0 {
maxMB = int32(fs.option.MaxMB)
}
- if maxMB <= 0 {
- glog.V(4).Infoln("AutoChunking not enabled")
- return false
- }
- glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
chunkSize := 1024 * 1024 * maxMB
- contentLength := int64(0)
- if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
- contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64)
- if contentLength <= int64(chunkSize) {
- glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.")
- return false
- }
- }
+ stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
+ }()
- if contentLength <= 0 {
- glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.")
- return false
+ var reply *FilerPostResult
+ var err error
+ var md5bytes []byte
+ if r.Method == "POST" {
+ if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
+ reply, err = fs.mkdir(ctx, w, r)
+ } else {
+ reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
+ }
+ } else {
+ reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
}
-
- reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
+ if len(md5bytes) > 0 {
+ w.Header().Set("Content-MD5", util.Base64Encode(md5bytes))
+ }
writeJsonQuiet(w, r, http.StatusCreated, reply)
}
- return true
}
-func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
-
- stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
- }()
+func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
- return nil, multipartReaderErr
+ return nil, nil, multipartReaderErr
}
part1, part1Err := multipartReader.NextPart()
if part1Err != nil {
- return nil, part1Err
+ return nil, nil, part1Err
}
fileName := part1.FileName()
@@ -90,54 +82,63 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
fileName = path.Base(fileName)
}
contentType := part1.Header.Get("Content-Type")
+ if contentType == "application/octet-stream" {
+ contentType = ""
+ }
- var fileChunks []*filer_pb.FileChunk
-
- md5Hash := md5.New()
- var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash))
-
- chunkOffset := int64(0)
-
- for chunkOffset < contentLength {
- limitedReader := io.LimitReader(partReader, int64(chunkSize))
-
- // assign one file id for one chunk
- fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
- if assignErr != nil {
- return nil, assignErr
- }
+ fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
+ if err != nil {
+ return nil, nil, err
+ }
- // upload the chunk to the volume server
- uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
- if uploadErr != nil {
- return nil, uploadErr
- }
+ fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
+ }
- // if last chunk exhausted the reader exactly at the border
- if uploadResult.Size == 0 {
- break
- }
+ md5bytes = md5Hash.Sum(nil)
+ filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset)
- // Save to chunk manifest structure
- fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
+ return
+}
- glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
+func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
- // reset variables for the next chunk
- chunkOffset = chunkOffset + int64(uploadResult.Size)
+ fileName := ""
+ contentType := ""
- // if last chunk was not at full chunk size, but already exhausted the reader
- if int64(uploadResult.Size) < int64(chunkSize) {
- break
- }
+ fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, ttlString, fileName, contentType, fsync)
+ if err != nil {
+ return nil, nil, err
}
- fileChunks, replyerr = filer2.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
+ fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, ttlString, fsync), fileChunks)
if replyerr != nil {
glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
return
}
+ md5bytes = md5Hash.Sum(nil)
+ filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset)
+
+ return
+}
+
+func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) {
+
+ // detect file mode
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
+ }
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
+ }
+
+ // fix the path
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if fileName != "" {
@@ -145,20 +146,28 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
}
}
+ // fix the crTime
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ crTime := time.Now()
+ if err == nil && existingEntry != nil {
+ crTime = existingEntry.Crtime
+ }
+
glog.V(4).Infoln("saving", path)
- entry := &filer2.Entry{
+ entry := &filer.Entry{
FullPath: util.FullPath(path),
- Attr: filer2.Attr{
+ Attr: filer.Attr{
Mtime: time.Now(),
- Crtime: time.Now(),
- Mode: 0660,
+ Crtime: crTime,
+ Mode: os.FileMode(mode),
Uid: OS_UID,
Gid: OS_GID,
Replication: replication,
Collection: collection,
TtlSec: ttlSec,
Mime: contentType,
- Md5: md5Hash.Sum(nil),
+ Md5: md5bytes,
+ FileSize: uint64(chunkOffset),
},
Chunks: fileChunks,
}
@@ -168,15 +177,57 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
Size: chunkOffset,
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
fs.filer.DeleteChunks(entry.Chunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
- return
}
+ return filerResult, replyerr
+}
- return
+func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) {
+ var fileChunks []*filer_pb.FileChunk
+
+ md5Hash := md5.New()
+ var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
+
+ chunkOffset := int64(0)
+
+ for {
+ limitedReader := io.LimitReader(partReader, int64(chunkSize))
+
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, ttlString, fsync)
+ if assignErr != nil {
+ return nil, nil, 0, assignErr
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ return nil, nil, 0, uploadErr
+ }
+
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
+ break
+ }
+
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
+
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
+
+ // reset variables for the next chunk
+ chunkOffset = chunkOffset + int64(uploadResult.Size)
+
+ // if last chunk was not at full chunk size, but already exhausted the reader
+ if int64(uploadResult.Size) < int64(chunkSize) {
+ break
+ }
+ }
+ return fileChunks, md5Hash, chunkOffset, nil
}
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) {
@@ -191,7 +242,7 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
return uploadResult, err
}
-func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, ttlString string, fsync bool) filer2.SaveDataAsChunkFunctionType {
+func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, ttlString string, fsync bool) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) {
// assign one file id for one chunk
@@ -210,3 +261,51 @@ func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCe
}
}
+func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http.Request) (filerResult *FilerPostResult, replyerr error) {
+
+ // detect file mode
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
+ }
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
+ }
+
+ // fix the path
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ path = path[:len(path)-1]
+ }
+
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if err == nil && existingEntry != nil {
+ replyerr = fmt.Errorf("dir %s already exists", path)
+ return
+ }
+
+ glog.V(4).Infoln("mkdir", path)
+ entry := &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode) | os.ModeDir,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+
+ filerResult = &FilerPostResult{
+ Name: util.FullPath(path).Name(),
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ replyerr = dbErr
+ filerResult.Error = dbErr.Error()
+ glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr)
+ }
+ return filerResult, replyerr
+}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index 8413496b8..60082a8d4 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -7,7 +7,7 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -58,9 +58,9 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
}
}
- entry := &filer2.Entry{
+ entry := &filer.Entry{
FullPath: util.FullPath(path),
- Attr: filer2.Attr{
+ Attr: filer.Attr{
Mtime: time.Now(),
Crtime: time.Now(),
Mode: 0660,
@@ -70,6 +70,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Collection: collection,
TtlSec: ttlSeconds,
Mime: pu.MimeType,
+ Md5: util.Base64Md5ToBytes(pu.ContentMd5),
},
Chunks: fileChunks,
}
@@ -79,7 +80,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Size: int64(pu.OriginalDataSize),
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
fs.filer.DeleteChunks(entry.Chunks)
err = dbErr
filerResult.Error = dbErr.Error()
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index e532b27e2..04a81433b 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -3,10 +3,19 @@ package master_ui
import (
"github.com/dustin/go-humanize"
"html/template"
+ "net/url"
+ "strings"
)
+func printpath(parts ...string) string {
+ concat := strings.Join(parts, "")
+ escaped := url.PathEscape(concat)
+ return strings.ReplaceAll(escaped, "%2F", "/")
+}
+
var funcMap = template.FuncMap{
"humanizeBytes": humanize.Bytes,
+ "printpath": printpath,
}
var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html>
@@ -50,7 +59,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div>
{{ range $entry := .Breadcrumbs }}
- <a href="{{ $entry.Link }}" >
+ <a href="{{ printpath $entry.Link }}" >
{{ $entry.Name }}
</a>
{{ end }}
@@ -69,11 +78,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<td>
{{if $entry.IsDirectory}}
<img src="/seaweedfsstatic/images/folder.gif" width="20" height="23">
- <a href={{ print $path "/" $entry.Name "/"}} >
+ <a href="{{ printpath $path "/" $entry.Name "/"}}" >
{{ $entry.Name }}
</a>
{{else}}
- <a href={{ print $path "/" $entry.Name }} >
+ <a href="{{ printpath $path "/" $entry.Name }}" >
{{ $entry.Name }}
</a>
{{end}}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 1ee214deb..692909a29 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -12,21 +12,19 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
var dn *topology.DataNode
- t := ms.Topo
defer func() {
if dn != nil {
// if the volume server disconnects and reconnects quickly
// the unregister and register can race with each other
- t.UnRegisterDataNode(dn)
+ ms.Topo.UnRegisterDataNode(dn)
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
message := &master_pb.VolumeLocation{
@@ -62,21 +60,18 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
- t.Sequence.SetMax(heartbeat.MaxFileKey)
+ ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
if dn == nil {
- dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- dc := t.GetOrCreateDataCenter(dcName)
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
- StorageBackends: backend.ToPbStorageBackends(),
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
@@ -102,12 +97,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
message.DeletedVids = append(message.DeletedVids, volInfo.Id)
}
// update master internal volume layouts
- t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
+ ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
}
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
// process heartbeat.Volumes
- newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+ newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
for _, v := range newVolumes {
glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
@@ -122,7 +117,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
// update master internal volume layouts
- t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
+ ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
for _, s := range heartbeat.NewEcShards {
message.NewVids = append(message.NewVids, s.Id)
@@ -137,8 +132,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
- glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
- newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
+ glog.V(1).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
// broadcast the ec vid changes to master clients
for _, s := range newShards {
@@ -163,7 +158,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
// tell the volume servers about the leader
- newLeader, err := t.Leader()
+ newLeader, err := ms.Topo.Leader()
if err != nil {
glog.Warningf("SendHeartbeat find leader: %v", err)
return err
@@ -192,7 +187,8 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
peerAddress := findClientAddress(stream.Context(), req.GrpcPort)
- stopChan := make(chan bool)
+ // buffer by 1 so we don't end up getting stuck writing to stopChan forever
+ stopChan := make(chan bool, 1)
clientName, messageChan := ms.addClient(req.Name, peerAddress)
@@ -252,7 +248,12 @@ func (ms *MasterServer) addClient(clientType string, clientAddress string) (clie
clientName = clientType + "@" + clientAddress
glog.V(0).Infof("+ client %v", clientName)
- messageChan = make(chan *master_pb.VolumeLocation)
+ // we buffer this because otherwise we end up in a potential deadlock where
+ // the KeepConnected loop is no longer listening on this channel but we're
+ // trying to send to it in SendHeartbeat and so we can't lock the
+ // clientChansLock to remove the channel and we're stuck writing to it
+ // 100 is probably overkill
+ messageChan = make(chan *master_pb.VolumeLocation, 100)
ms.clientChansLock.Lock()
ms.clientChans[clientName] = messageChan
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 282c75679..6a320dfb1 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/raft"
@@ -184,6 +185,8 @@ func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_
resp := &master_pb.GetMasterConfigurationResponse{
MetricsAddress: ms.option.MetricsAddress,
MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
+ DefaultReplication: ms.option.DefaultReplicaPlacement,
}
return resp, nil
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 377fac26f..657b170c2 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -7,7 +7,6 @@ import (
"net/url"
"os"
"regexp"
- "strconv"
"strings"
"sync"
"time"
@@ -210,7 +209,7 @@ func (ms *MasterServer) startAdminScripts() {
scriptLines = append(scriptLines, "unlock")
}
- masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
+ masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 7595c0171..34235384f 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -110,7 +110,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
} else {
url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
}
- http.Redirect(w, r, url, http.StatusMovedPermanently)
+ http.Redirect(w, r, url, http.StatusPermanentRedirect)
} else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index eaf5aaf6e..9296c63e9 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -149,7 +149,35 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
}
return resp, err
+}
+
+func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
+
+ resp := &volume_server_pb.VolumeMarkWritableResponse{}
+
+ err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
+
+ if err != nil {
+ glog.Errorf("volume mark writable %v: %v", req, err)
+ } else {
+ glog.V(2).Infof("volume mark writable %v", req)
+ }
+
+ return resp, err
+}
+
+func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
+ resp := &volume_server_pb.VolumeStatusResponse{}
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp.IsReadOnly = v.IsReadOnly()
+
+ return resp, nil
}
func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
@@ -168,6 +196,16 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv
}
+func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
+
+ resp := &volume_server_pb.VolumeServerLeaveResponse{}
+
+ vs.StopHeartbeat()
+
+ return resp, nil
+
+}
+
func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
resp := &volume_server_pb.VolumeNeedleStatusResponse{}
@@ -188,7 +226,7 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
}
count, err = vs.store.ReadEcShardNeedle(volumeId, n)
} else {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n)
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil)
}
if err != nil {
return nil, err
@@ -199,7 +237,7 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
resp.NeedleId = uint64(n.Id)
resp.Cookie = uint32(n.Cookie)
- resp.Size = n.Size
+ resp.Size = uint32(n.Size)
resp.LastModified = n.LastModified
resp.Crc = n.Checksum.Value()
if n.HasTtl() {
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index 501964191..8e84dc2a8 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -41,7 +41,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
} else {
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusNotFound,
@@ -79,7 +79,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusAccepted,
- Size: size},
+ Size: uint32(size)},
)
}
}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 7cb836344..8698a4c64 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,7 +2,7 @@ package weed_server
import (
"fmt"
- "net"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"time"
"google.golang.org/grpc"
@@ -22,6 +22,31 @@ import (
func (vs *VolumeServer) GetMaster() string {
return vs.currentMaster
}
+
+func (vs *VolumeServer) checkWithMaster() (err error) {
+ isConnected := false
+ for !isConnected {
+ for _, master := range vs.SeedMasterNodes {
+ err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ vs.metricsAddress, vs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ backend.LoadFromPbStorageBackends(resp.StorageBackends)
+ return nil
+ })
+ if err == nil {
+ return
+ } else {
+ glog.V(0).Infof("checkWithMaster %s: %v", master, err)
+ }
+ }
+ time.Sleep(1790 * time.Millisecond)
+ }
+ return
+}
+
func (vs *VolumeServer) heartbeat() {
glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes)
@@ -32,7 +57,7 @@ func (vs *VolumeServer) heartbeat() {
var err error
var newLeader string
- for {
+ for vs.isHeartbeating {
for _, master := range vs.SeedMasterNodes {
if newLeader != "" {
// the new leader may actually is the same master
@@ -53,20 +78,35 @@ func (vs *VolumeServer) heartbeat() {
newLeader = ""
vs.store.MasterAddress = ""
}
+ if !vs.isHeartbeating {
+ break
+ }
}
}
}
+func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
+ if !vs.isHeartbeating {
+ return true
+ }
+ vs.isHeartbeating = false
+ vs.stopChan <- true
+ return false
+}
+
func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
defer grpcConection.Close()
client := master_pb.NewSeaweedClient(grpcConection)
- stream, err := client.SendHeartbeat(context.Background())
+ stream, err := client.SendHeartbeat(ctx)
if err != nil {
glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
return "", err
@@ -87,23 +127,16 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
if vs.store.MaybeAdjustVolumeMax() {
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
}
}
}
- if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) {
- glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
+ if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() {
+ glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
newLeader = in.GetLeader()
doneChan <- nil
return
}
- if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() {
- vs.MetricsAddress = in.GetMetricsAddress()
- vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
- }
- if len(in.StorageBackends) > 0 {
- backend.LoadFromPbStorageBackends(in.StorageBackends)
- }
}
}()
@@ -182,19 +215,8 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
case err = <-doneChan:
return
+ case <-vs.stopChan:
+ return
}
}
}
-
-func isSameIP(ip string, host string) bool {
- ips, err := net.LookupIP(host)
- if err != nil {
- return false
- }
- for _, t := range ips {
- if ip == t.String() {
- return true
- }
- }
- return false
-}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 5c7d5572c..cd2b53c8a 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -27,17 +27,12 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
- err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
- if err != nil {
- return nil, fmt.Errorf("failed to mount existing volume %d: %v", req.VolumeId, err)
- }
-
- err = vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
+ err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
if err != nil {
return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
}
- glog.V(0).Infof("deleted exisitng volume %d before copying.", req.VolumeId)
+ glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
}
location := vs.store.FindFreeLocation()
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 79348c9d7..55e0261c8 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -272,7 +272,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
if req.FileKey != 0 {
_, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
IsDeleted: true,
})
@@ -340,7 +340,7 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
if err != nil {
return nil, fmt.Errorf("locate in local ec volume: %v", err)
}
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return resp, nil
}
diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go
index 767e28e7b..2f4fab96a 100644
--- a/weed/server/volume_grpc_query.go
+++ b/weed/server/volume_grpc_query.go
@@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
return err
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 3af37b491..83df32fdd 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -28,9 +28,11 @@ type VolumeServer struct {
FixJpgOrientation bool
ReadRedirect bool
compactionBytePerSecond int64
- MetricsAddress string
- MetricsIntervalSec int
+ metricsAddress string
+ metricsIntervalSec int
fileSizeLimitBytes int64
+ isHeartbeating bool
+ stopChan chan bool
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
@@ -66,16 +68,21 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
+ isHeartbeating: true,
+ stopChan: make(chan bool),
}
vs.SeedMasterNodes = masterNodes
+
+ vs.checkWithMaster()
+
vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
+ adminMux.HandleFunc("/status", vs.statusHandler)
if signingKey == "" || enableUiAccess {
// only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
- adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
/*
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
@@ -90,11 +97,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
go vs.heartbeat()
- hostAddress := fmt.Sprintf("%s:%d", ip, port)
- go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather,
- func() (addr string, intervalSeconds int) {
- return vs.MetricsAddress, vs.MetricsIntervalSec
- })
+ go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec)
return vs
}
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 14ad27d42..ad13cdf3b 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
@@ -25,6 +26,7 @@ security settings:
*/
func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
switch r.Method {
case "GET", "HEAD":
stats.ReadRequest()
@@ -39,6 +41,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
}
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
switch r.Method {
case "GET":
stats.ReadRequest()
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 34655d833..4d84c9c4d 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -10,6 +10,7 @@ import (
)
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{})
m["Version"] = util.Version()
var ds []*volume_server_pb.DiskStatus
@@ -24,6 +25,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{})
m["Version"] = util.Version()
var ds []*volume_server_pb.DiskStatus
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index d730600e4..bb04678d6 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -18,6 +18,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -81,15 +82,20 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
+
+ readOption := &storage.ReadOption{
+ ReadDeleted: r.FormValue("readDeleted") == "true",
+ }
+
var count int
if hasVolume {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n)
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(volumeId, n)
}
// glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
- glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
+ glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
return
}
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index 8b2027e7b..e535327e2 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -13,6 +13,7 @@ import (
)
func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
var ds []*volume_server_pb.DiskStatus
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 74dad28de..78cbf08c5 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -42,7 +42,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
+ reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
@@ -70,6 +70,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ret.ETag = reqNeedle.Etag()
ret.Mime = string(reqNeedle.Mime)
setEtag(w, ret.ETag)
+ w.Header().Set("Content-MD5", contentMd5)
writeJsonQuiet(w, r, httpStatus, ret)
}
@@ -103,7 +104,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
- _, ok := vs.store.ReadVolumeNeedle(volumeId, n)
+ _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil)
if ok != nil {
m := make(map[string]uint32)
m["size"] = 0
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 8655daf70..f13e73a7b 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -10,7 +10,6 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/util/grace"
"golang.org/x/net/webdav"
"google.golang.org/grpc"
@@ -20,7 +19,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
)
@@ -42,7 +41,7 @@ type WebDavOption struct {
type WebDavServer struct {
option *WebDavOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
Handler *webdav.Handler
}
@@ -68,9 +67,10 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
type WebDavFileSystem struct {
option *WebDavOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
- chunkCache *chunk_cache.ChunkCache
+ chunkCache *chunk_cache.TieredChunkCache
+ signature int32
}
type FileInfo struct {
@@ -94,19 +94,17 @@ type WebDavFile struct {
isDirectory bool
off int64
entry *filer_pb.Entry
- entryViewCache []filer2.VisibleInterval
+ entryViewCache []filer.VisibleInterval
reader io.ReaderAt
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
- chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB)
- grace.OnInterrupt(func() {
- chunkCache.Shutdown()
- })
+ chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024)
return &WebDavFileSystem{
option: option,
chunkCache: chunkCache,
+ signature: util.RandomInt32(),
}, nil
}
@@ -169,6 +167,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
Gid: fs.option.Gid,
},
},
+ Signatures: []int32{fs.signature},
}
glog.V(1).Infof("mkdir: %v", request)
@@ -220,6 +219,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
TtlSec: 0,
},
},
+ Signatures: []int32{fs.signature},
}); err != nil {
return fmt.Errorf("create %s: %v", fullFilePath, err)
}
@@ -259,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string)
dir, name := util.FullPath(fullFilePath).DirAndName()
- return filer_pb.Remove(fs, dir, name, true, false, false, false)
+ return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
}
@@ -338,7 +338,7 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F
if err != nil {
return nil, err
}
- fi.size = int64(filer2.TotalSize(entry.GetChunks()))
+ fi.size = int64(filer.FileSize(entry))
fi.name = string(fullpath)
fi.mode = os.FileMode(entry.Attributes.FileMode)
fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
@@ -426,8 +426,9 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
f.entry.Attributes.Replication = replication
request := &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: f.entry,
+ Directory: dir,
+ Entry: f.entry,
+ Signatures: []int32{f.fs.signature},
}
if _, err := client.UpdateEntry(ctx, request); err != nil {
@@ -470,16 +471,17 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
if err != nil {
return 0, err
}
- if len(f.entry.Chunks) == 0 {
+ fileSize := int64(filer.FileSize(f.entry))
+ if fileSize == 0 {
return 0, io.EOF
}
if f.entryViewCache == nil {
- f.entryViewCache, _ = filer2.NonOverlappingVisibleIntervals(filer2.LookupFn(f.fs), f.entry.Chunks)
+ f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
f.reader = nil
}
if f.reader == nil {
- chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32)
- f.reader = filer2.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache)
+ chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
+ f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
}
readSize, err = f.reader.ReadAt(p, f.off)
@@ -507,7 +509,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
fi := FileInfo{
- size: int64(filer2.TotalSize(entry.GetChunks())),
+ size: int64(filer.FileSize(entry)),
name: entry.Name,
mode: os.FileMode(entry.Attributes.FileMode),
modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
@@ -550,9 +552,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
- case 0:
+ case io.SeekStart:
f.off = 0
- case 2:
+ case io.SeekEnd:
if fi, err := f.fs.stat(ctx, f.name); err != nil {
return 0, err
} else {