aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go2
-rw-r--r--weed/server/filer_grpc_server.go201
-rw-r--r--weed/server/filer_grpc_server_kv.go42
-rw-r--r--weed/server/filer_grpc_server_listen.go108
-rw-r--r--weed/server/filer_grpc_server_rename.go21
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go181
-rw-r--r--weed/server/filer_server.go81
-rw-r--r--weed/server/filer_server_handlers.go3
-rw-r--r--weed/server/filer_server_handlers_read.go31
-rw-r--r--weed/server/filer_server_handlers_read_dir.go46
-rw-r--r--weed/server/filer_server_handlers_write.go236
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go270
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go15
-rw-r--r--weed/server/filer_ui/templates.go34
-rw-r--r--weed/server/master_grpc_server.go54
-rw-r--r--weed/server/master_grpc_server_volume.go11
-rw-r--r--weed/server/master_server.go20
-rw-r--r--weed/server/master_server_handlers_admin.go2
-rw-r--r--weed/server/master_ui/templates.go6
-rw-r--r--weed/server/raft_server.go131
-rw-r--r--weed/server/raft_server_handlers.go14
-rw-r--r--weed/server/volume_grpc_admin.go80
-rw-r--r--weed/server/volume_grpc_batch_delete.go4
-rw-r--r--weed/server/volume_grpc_client_to_master.go76
-rw-r--r--weed/server/volume_grpc_copy.go19
-rw-r--r--weed/server/volume_grpc_erasure_coding.go32
-rw-r--r--weed/server/volume_grpc_file.go129
-rw-r--r--weed/server/volume_grpc_query.go2
-rw-r--r--weed/server/volume_server.go23
-rw-r--r--weed/server/volume_server_handlers.go3
-rw-r--r--weed/server/volume_server_handlers_admin.go2
-rw-r--r--weed/server/volume_server_handlers_read.go55
-rw-r--r--weed/server/volume_server_handlers_ui.go1
-rw-r--r--weed/server/volume_server_handlers_write.go10
-rw-r--r--weed/server/webdav_server.go56
35 files changed, 1111 insertions, 890 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index bc6008864..44098a4b5 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -218,7 +218,7 @@ func handleStaticResources2(r *mux.Router) {
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(statikFS)))
}
-func adjustHeadersAfterHEAD(w http.ResponseWriter, r *http.Request, filename string) {
+func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) {
if filename != "" {
contentDisposition := "inline"
if r.FormValue("dl") != "" {
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 901f798f0..8f326f5c7 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -6,14 +6,14 @@ import (
"os"
"path/filepath"
"strconv"
- "strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -32,11 +32,13 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L
return &filer_pb.LookupDirectoryEntryResponse{
Entry: &filer_pb.Entry{
- Name: req.Name,
- IsDirectory: entry.IsDirectory(),
- Attributes: filer2.EntryAttributeToPb(entry),
- Chunks: entry.Chunks,
- Extended: entry.Extended,
+ Name: req.Name,
+ IsDirectory: entry.IsDirectory(),
+ Attributes: filer.EntryAttributeToPb(entry),
+ Chunks: entry.Chunks,
+ Extended: entry.Extended,
+ HardLinkId: entry.HardLinkId,
+ HardLinkCounter: entry.HardLinkCounter,
},
}, nil
}
@@ -50,7 +52,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
limit = fs.option.DirListingLimit
}
- paginationLimit := filer2.PaginationSize
+ paginationLimit := filer.PaginationSize
if limit < paginationLimit {
paginationLimit = limit
}
@@ -58,7 +60,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName := req.StartFromFileName
includeLastFile := req.InclusiveStartFrom
for limit > 0 {
- entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit)
+ entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit, req.Prefix)
if err != nil {
return err
@@ -73,19 +75,15 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file
lastFileName = entry.Name()
- if req.Prefix != "" {
- if !strings.HasPrefix(entry.Name(), req.Prefix) {
- continue
- }
- }
-
if err := stream.Send(&filer_pb.ListEntriesResponse{
Entry: &filer_pb.Entry{
- Name: entry.Name(),
- IsDirectory: entry.IsDirectory(),
- Chunks: entry.Chunks,
- Attributes: filer2.EntryAttributeToPb(entry),
- Extended: entry.Extended,
+ Name: entry.Name(),
+ IsDirectory: entry.IsDirectory(),
+ Chunks: entry.Chunks,
+ Attributes: filer.EntryAttributeToPb(entry),
+ Extended: entry.Extended,
+ HardLinkId: entry.HardLinkId,
+ HardLinkCounter: entry.HardLinkCounter,
},
}); err != nil {
return err
@@ -137,28 +135,43 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
return resp, nil
}
+func (fs *FilerServer) lookupFileId(fileId string) (targetUrls []string, err error) {
+ fid, err := needle.ParseFileIdFromString(fileId)
+ if err != nil {
+ return nil, err
+ }
+ locations, found := fs.filer.MasterClient.GetLocations(uint32(fid.VolumeId))
+ if !found || len(locations) == 0 {
+ return nil, fmt.Errorf("not found volume %d in %s", fid.VolumeId, fileId)
+ }
+ for _, loc := range locations {
+ targetUrls = append(targetUrls, fmt.Sprintf("http://%s/%s", loc.Url, fileId))
+ }
+ return
+}
+
func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) {
- glog.V(4).Infof("CreateEntry %v", req)
+ glog.V(4).Infof("CreateEntry %v/%v", req.Directory, req.Entry.Name)
resp = &filer_pb.CreateEntryResponse{}
- chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
-
- if req.Entry.Attributes == nil {
- glog.V(3).Infof("CreateEntry %s: nil attributes", filepath.Join(req.Directory, req.Entry.Name))
- resp.Error = fmt.Sprintf("can not create entry with empty attributes")
- return
+ chunks, garbage, err2 := fs.cleanupChunks(nil, req.Entry)
+ if err2 != nil {
+ return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
}
- createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{
- FullPath: util.JoinPath(req.Directory, req.Entry.Name),
- Attr: filer2.PbToEntryAttribute(req.Entry.Attributes),
- Chunks: chunks,
- }, req.OExcl)
+ createErr := fs.filer.CreateEntry(ctx, &filer.Entry{
+ FullPath: util.JoinPath(req.Directory, req.Entry.Name),
+ Attr: filer.PbToEntryAttribute(req.Entry.Attributes),
+ Chunks: chunks,
+ Extended: req.Entry.Extended,
+ HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
+ HardLinkCounter: req.Entry.HardLinkCounter,
+ }, req.OExcl, req.IsFromOtherCluster, req.Signatures)
if createErr == nil {
- fs.filer.DeleteChunks(garbages)
+ fs.filer.DeleteChunks(garbage)
} else {
glog.V(3).Infof("CreateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), createErr)
resp.Error = createErr.Error()
@@ -177,16 +190,18 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err)
}
- // remove old chunks if not included in the new ones
- unusedChunks := filer2.MinusChunks(entry.Chunks, req.Entry.Chunks)
-
- chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks)
+ chunks, garbage, err2 := fs.cleanupChunks(entry, req.Entry)
+ if err2 != nil {
+ return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("UpdateEntry cleanupChunks %s: %v", fullpath, err2)
+ }
- newEntry := &filer2.Entry{
- FullPath: util.JoinPath(req.Directory, req.Entry.Name),
- Attr: entry.Attr,
- Extended: req.Entry.Extended,
- Chunks: chunks,
+ newEntry := &filer.Entry{
+ FullPath: util.JoinPath(req.Directory, req.Entry.Name),
+ Attr: entry.Attr,
+ Extended: req.Entry.Extended,
+ Chunks: chunks,
+ HardLinkId: filer.HardLinkId(req.Entry.HardLinkId),
+ HardLinkCounter: req.Entry.HardLinkCounter,
}
glog.V(3).Infof("updating %s: %+v, chunks %d: %v => %+v, chunks %d: %v, extended: %v => %v",
@@ -209,22 +224,51 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr
}
- if filer2.EqualEntry(entry, newEntry) {
+ if filer.EqualEntry(entry, newEntry) {
return &filer_pb.UpdateEntryResponse{}, err
}
if err = fs.filer.UpdateEntry(ctx, entry, newEntry); err == nil {
- fs.filer.DeleteChunks(unusedChunks)
- fs.filer.DeleteChunks(garbages)
+ fs.filer.DeleteChunks(garbage)
+
+ fs.filer.NotifyUpdateEvent(ctx, entry, newEntry, true, req.IsFromOtherCluster, req.Signatures)
+
} else {
glog.V(3).Infof("UpdateEntry %s: %v", filepath.Join(req.Directory, req.Entry.Name), err)
}
- fs.filer.NotifyUpdateEvent(entry, newEntry, true)
-
return &filer_pb.UpdateEntryResponse{}, err
}
+func (fs *FilerServer) cleanupChunks(existingEntry *filer.Entry, newEntry *filer_pb.Entry) (chunks, garbage []*filer_pb.FileChunk, err error) {
+
+ // remove old chunks if not included in the new ones
+ if existingEntry != nil {
+ garbage, err = filer.MinusChunks(fs.lookupFileId, existingEntry.Chunks, newEntry.Chunks)
+ if err != nil {
+ return newEntry.Chunks, nil, fmt.Errorf("MinusChunks: %v", err)
+ }
+ }
+
+ // files with manifest chunks are usually large and append only, skip calculating covered chunks
+ manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(newEntry.Chunks)
+
+ chunks, coveredChunks := filer.CompactFileChunks(fs.lookupFileId, nonManifestChunks)
+ garbage = append(garbage, coveredChunks...)
+
+ if newEntry.Attributes != nil {
+ chunks, err = filer.MaybeManifestize(fs.saveAsChunk(newEntry.Attributes.Replication, newEntry.Attributes.Collection, "", "", needle.SecondsToTTL(newEntry.Attributes.TtlSec), false), chunks)
+ if err != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", err)
+ }
+ }
+
+ chunks = append(chunks, manifestChunks...)
+
+ return
+}
+
func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) {
glog.V(4).Infof("AppendToEntry %v", req)
@@ -233,9 +277,9 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
var offset int64 = 0
entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath))
if err == filer_pb.ErrNotFound {
- entry = &filer2.Entry{
+ entry = &filer.Entry{
FullPath: fullpath,
- Attr: filer2.Attr{
+ Attr: filer.Attr{
Crtime: time.Now(),
Mtime: time.Now(),
Mode: os.FileMode(0644),
@@ -244,7 +288,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
},
}
} else {
- offset = int64(filer2.TotalSize(entry.Chunks))
+ offset = int64(filer.TotalSize(entry.Chunks))
}
for _, chunk := range req.Chunks {
@@ -254,7 +298,13 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
entry.Chunks = append(entry.Chunks, req.Chunks...)
- err = fs.filer.CreateEntry(context.Background(), entry, false)
+ entry.Chunks, err = filer.MaybeManifestize(fs.saveAsChunk(entry.Replication, entry.Collection, "", "", needle.SecondsToTTL(entry.TtlSec), false), entry.Chunks)
+ if err != nil {
+ // not good, but should be ok
+ glog.V(0).Infof("MaybeManifestize: %v", err)
+ }
+
+ err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil)
return &filer_pb.AppendToEntryResponse{}, err
}
@@ -263,7 +313,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
glog.V(4).Infof("DeleteEntry %v", req)
- err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData)
+ err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData, req.IsFromOtherCluster, req.Signatures)
resp = &filer_pb.DeleteEntryResponse{}
if err != nil {
resp.Error = err.Error()
@@ -277,7 +327,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
if req.TtlSec > 0 {
ttlStr = strconv.Itoa(int(req.TtlSec))
}
- collection, replication, _ := fs.detectCollection(req.ParentPath, req.Collection, req.Replication)
+ collection, replication, _ := fs.detectCollection(req.Path, req.Collection, req.Replication)
var altRequest *operation.VolumeAssignRequest
@@ -285,6 +335,10 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
if dataCenter == "" {
dataCenter = fs.option.DataCenter
}
+ rack := req.Rack
+ if rack == "" {
+ rack = fs.option.Rack
+ }
assignRequest := &operation.VolumeAssignRequest{
Count: uint64(req.Count),
@@ -292,14 +346,16 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
Collection: collection,
Ttl: ttlStr,
DataCenter: dataCenter,
+ Rack: rack,
}
- if dataCenter != "" {
+ if dataCenter != "" || rack != "" {
altRequest = &operation.VolumeAssignRequest{
Count: uint64(req.Count),
Replication: replication,
Collection: collection,
Ttl: ttlStr,
DataCenter: "",
+ Rack: "",
}
}
assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest)
@@ -323,6 +379,28 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
}, nil
}
+func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.CollectionListRequest) (resp *filer_pb.CollectionListResponse, err error) {
+
+ glog.V(4).Infof("CollectionList %v", req)
+ resp = &filer_pb.CollectionListResponse{}
+
+ err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
+ IncludeNormalVolumes: req.IncludeNormalVolumes,
+ IncludeEcVolumes: req.IncludeEcVolumes,
+ })
+ if err != nil {
+ return err
+ }
+ for _, c := range masterResp.Collections {
+ resp.Collections = append(resp.Collections, &filer_pb.Collection{Name: c.Name})
+ }
+ return nil
+ })
+
+ return
+}
+
func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) {
glog.V(4).Infof("DeleteCollection %v", req)
@@ -369,12 +447,15 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
t := &filer_pb.GetFilerConfigurationResponse{
- Masters: fs.option.Masters,
- Collection: fs.option.Collection,
- Replication: fs.option.DefaultReplication,
- MaxMb: uint32(fs.option.MaxMB),
- DirBuckets: fs.filer.DirBucketsPath,
- Cipher: fs.filer.Cipher,
+ Masters: fs.option.Masters,
+ Collection: fs.option.Collection,
+ Replication: fs.option.DefaultReplication,
+ MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ Cipher: fs.filer.Cipher,
+ Signature: fs.filer.Signature,
+ MetricsAddress: fs.metricsAddress,
+ MetricsIntervalSec: int32(fs.metricsIntervalSec),
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
diff --git a/weed/server/filer_grpc_server_kv.go b/weed/server/filer_grpc_server_kv.go
new file mode 100644
index 000000000..3cb47115e
--- /dev/null
+++ b/weed/server/filer_grpc_server_kv.go
@@ -0,0 +1,42 @@
+package weed_server
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (fs *FilerServer) KvGet(ctx context.Context, req *filer_pb.KvGetRequest) (*filer_pb.KvGetResponse, error) {
+
+ value, err := fs.filer.Store.KvGet(ctx, req.Key)
+ if err == filer.ErrKvNotFound {
+ return &filer_pb.KvGetResponse{}, nil
+ }
+
+ if err != nil {
+ return &filer_pb.KvGetResponse{Error: err.Error()}, nil
+ }
+
+ return &filer_pb.KvGetResponse{
+ Value: value,
+ }, nil
+
+}
+
+// KvPut sets the key~value. if empty value, delete the kv entry
+func (fs *FilerServer) KvPut(ctx context.Context, req *filer_pb.KvPutRequest) (*filer_pb.KvPutResponse, error) {
+
+ if len(req.Value) == 0 {
+ if err := fs.filer.Store.KvDelete(ctx, req.Key); err != nil {
+ return &filer_pb.KvPutResponse{Error: err.Error()}, nil
+ }
+ }
+
+ err := fs.filer.Store.KvPut(ctx, req.Key, req.Value)
+ if err != nil {
+ return &filer_pb.KvPutResponse{Error: err.Error()}, nil
+ }
+
+ return &filer_pb.KvPutResponse{}, nil
+
+}
diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go
deleted file mode 100644
index 848a1fc3a..000000000
--- a/weed/server/filer_grpc_server_listen.go
+++ /dev/null
@@ -1,108 +0,0 @@
-package weed_server
-
-import (
- "fmt"
- "strings"
- "time"
-
- "github.com/golang/protobuf/proto"
-
- "github.com/chrislusf/seaweedfs/weed/filer2"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
-
- peerAddress := findClientAddress(stream.Context(), 0)
-
- clientName := fs.addClient(req.ClientName, peerAddress)
-
- defer fs.deleteClient(clientName)
-
- lastReadTime := time.Unix(0, req.SinceNs)
- glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- var processedTsNs int64
-
- eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
-
- // get complete path to the file or directory
- var entryName string
- if eventNotification.OldEntry != nil {
- entryName = eventNotification.OldEntry.Name
- } else if eventNotification.NewEntry != nil {
- entryName = eventNotification.NewEntry.Name
- }
-
- fullpath := util.Join(dirPath, entryName)
-
- // skip on filer internal meta logs
- if strings.HasPrefix(fullpath, filer2.SystemLogDir) {
- return nil
- }
-
- if !strings.HasPrefix(fullpath, req.PathPrefix) {
- return nil
- }
-
- message := &filer_pb.SubscribeMetadataResponse{
- Directory: dirPath,
- EventNotification: eventNotification,
- TsNs: tsNs,
- }
- if err := stream.Send(message); err != nil {
- glog.V(0).Infof("=> client %v: %+v", clientName, err)
- return err
- }
- return nil
- }
-
- eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error {
- event := &filer_pb.SubscribeMetadataResponse{}
- if err := proto.Unmarshal(logEntry.Data, event); err != nil {
- glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
- return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
- }
-
- if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil {
- return err
- }
-
- processedTsNs = logEntry.TsNs
-
- return nil
- }
-
- if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil {
- return fmt.Errorf("reading from persisted logs: %v", err)
- }
-
- if processedTsNs != 0 {
- lastReadTime = time.Unix(0, processedTsNs)
- }
-
- err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
- fs.listenersLock.Lock()
- fs.listenersCond.Wait()
- fs.listenersLock.Unlock()
- return true
- }, eachLogEntryFn)
-
- return err
-
-}
-
-func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) {
- clientName = clientType + "@" + clientAddress
- glog.V(0).Infof("+ listener %v", clientName)
- return
-}
-
-func (fs *FilerServer) deleteClient(clientName string) {
- glog.V(0).Infof("- listener %v", clientName)
-}
-
-func (fs *FilerServer) notifyMetaListeners() {
- fs.listenersCond.Broadcast()
-}
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 7029c3342..f9ddeb600 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -5,7 +5,7 @@ import (
"fmt"
"path/filepath"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -43,7 +43,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return &filer_pb.AtomicRenameEntryResponse{}, nil
}
-func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error {
if entry.IsDirectory() {
@@ -59,7 +59,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e
return nil
}
-func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents) error {
currentDirPath := oldParent.Child(entry.Name())
newDirPath := newParent.Child(newName)
@@ -70,7 +70,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
includeLastFile := false
for {
- entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024)
+ entries, err := fs.filer.ListDirectoryEntries(ctx, currentDirPath, lastFileName, includeLastFile, 1024, "")
if err != nil {
return err
}
@@ -92,7 +92,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return nil
}
-func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents,
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, events *MoveEvents,
moveFolderSubEntries func() error) error {
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
@@ -105,12 +105,13 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
}
// add to new directory
- newEntry := &filer2.Entry{
+ newEntry := &filer.Entry{
FullPath: newPath,
Attr: entry.Attr,
Chunks: entry.Chunks,
+ Extended: entry.Extended,
}
- createErr := fs.filer.CreateEntry(ctx, newEntry, false)
+ createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, nil)
if createErr != nil {
return createErr
}
@@ -124,7 +125,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
}
// delete old entry
- deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false)
+ deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false, false, nil)
if deleteErr != nil {
return deleteErr
}
@@ -136,6 +137,6 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
}
type MoveEvents struct {
- oldEntries []*filer2.Entry
- newEntries []*filer2.Entry
+ oldEntries []*filer.Entry
+ newEntries []*filer.Entry
}
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
new file mode 100644
index 000000000..634fb5211
--- /dev/null
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -0,0 +1,181 @@
+package weed_server
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/util/log_buffer"
+ "strings"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error {
+
+ peerAddress := findClientAddress(stream.Context(), 0)
+
+ clientName := fs.addClient(req.ClientName, peerAddress)
+
+ defer fs.deleteClient(clientName)
+
+ lastReadTime := time.Unix(0, req.SinceNs)
+ glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+
+ eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
+
+ processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
+ for {
+ lastReadTime, err = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.filer.MetaAggregator.ListenersLock.Lock()
+ fs.filer.MetaAggregator.ListenersCond.Wait()
+ fs.filer.MetaAggregator.ListenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+ if err != nil {
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
+
+ return err
+
+}
+
+func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeLocalMetadataServer) error {
+
+ peerAddress := findClientAddress(stream.Context(), 0)
+
+ clientName := fs.addClient(req.ClientName, peerAddress)
+
+ defer fs.deleteClient(clientName)
+
+ lastReadTime := time.Unix(0, req.SinceNs)
+ glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ eachEventNotificationFn := fs.eachEventNotificationFn(req, stream, clientName, req.Signature)
+
+ eachLogEntryFn := eachLogEntryFn(eachEventNotificationFn)
+
+ // println("reading from persisted logs ...")
+ processedTsNs, err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ if err != nil {
+ return fmt.Errorf("reading from persisted logs: %v", err)
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+ glog.V(0).Infof("after local log reads, %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
+
+ // println("reading from in memory logs ...")
+ for {
+ lastReadTime, err = fs.filer.LocalMetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool {
+ fs.listenersLock.Lock()
+ fs.listenersCond.Wait()
+ fs.listenersLock.Unlock()
+ return true
+ }, eachLogEntryFn)
+ if err != nil {
+ glog.Errorf("processed to %v: %v", lastReadTime, err)
+ time.Sleep(3127 * time.Millisecond)
+ if err != log_buffer.ResumeError {
+ break
+ }
+ }
+ }
+
+ return err
+
+}
+
+func eachLogEntryFn(eachEventNotificationFn func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error) func(logEntry *filer_pb.LogEntry) error {
+ return func(logEntry *filer_pb.LogEntry) error {
+ event := &filer_pb.SubscribeMetadataResponse{}
+ if err := proto.Unmarshal(logEntry.Data, event); err != nil {
+ glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err)
+ }
+
+ if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil {
+ return err
+ }
+
+ return nil
+ }
+}
+
+func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer, clientName string, clientSignature int32) func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+ return func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error {
+
+ foundSelf := false
+ for _, sig := range eventNotification.Signatures {
+ if sig == clientSignature && clientSignature != 0 {
+ return nil
+ }
+ if sig == fs.filer.Signature {
+ foundSelf = true
+ }
+ }
+ if !foundSelf {
+ eventNotification.Signatures = append(eventNotification.Signatures, fs.filer.Signature)
+ }
+
+ // get complete path to the file or directory
+ var entryName string
+ if eventNotification.OldEntry != nil {
+ entryName = eventNotification.OldEntry.Name
+ } else if eventNotification.NewEntry != nil {
+ entryName = eventNotification.NewEntry.Name
+ }
+
+ fullpath := util.Join(dirPath, entryName)
+
+ // skip on filer internal meta logs
+ if strings.HasPrefix(fullpath, filer.SystemLogDir) {
+ return nil
+ }
+
+ if !strings.HasPrefix(fullpath, req.PathPrefix) {
+ return nil
+ }
+
+ message := &filer_pb.SubscribeMetadataResponse{
+ Directory: dirPath,
+ EventNotification: eventNotification,
+ TsNs: tsNs,
+ }
+ // println("sending", dirPath, entryName)
+ if err := stream.Send(message); err != nil {
+ glog.V(0).Infof("=> client %v: %+v", clientName, err)
+ return err
+ }
+ return nil
+ }
+}
+
+func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) {
+ clientName = clientType + "@" + clientAddress
+ glog.V(0).Infof("+ listener %v", clientName)
+ return
+}
+
+func (fs *FilerServer) deleteClient(clientName string) {
+ glog.V(0).Infof("- listener %v", clientName)
+}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 10b607dfe..065bb3251 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -8,6 +8,8 @@ import (
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/util/grace"
@@ -15,19 +17,19 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
- "github.com/chrislusf/seaweedfs/weed/filer2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/cassandra"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/redis"
- _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
@@ -46,20 +48,26 @@ type FilerOption struct {
MaxMB int
DirListingLimit int
DataCenter string
+ Rack string
DefaultLevelDbDir string
DisableHttp bool
Host string
Port uint32
recursiveDelete bool
Cipher bool
+ Filers []string
}
type FilerServer struct {
option *FilerOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
+ // metrics read from the master
+ metricsAddress string
+ metricsIntervalSec int
+
// notifying clients
listenersLock sync.Mutex
listenersCond *sync.Cond
@@ -81,11 +89,14 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, fs.notifyMetaListeners)
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
+ fs.listenersCond.Broadcast()
+ })
fs.filer.Cipher = option.Cipher
- maybeStartMetrics(fs, option)
+ fs.checkWithMaster()
+ go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
go fs.filer.KeepConnectedToMaster()
v := util.GetViper()
@@ -96,6 +107,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
}
+ glog.V(0).Infof("default to create filer store dir in %s", option.DefaultLevelDbDir)
}
util.LoadConfiguration("notification", false)
@@ -115,6 +127,8 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
+ fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
+
fs.filer.LoadBuckets()
grace.OnInterrupt(func() {
@@ -124,9 +138,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
return fs, nil
}
-func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
+func (fs *FilerServer) checkWithMaster() {
- for _, master := range option.Masters {
+ for _, master := range fs.option.Masters {
_, err := pb.ParseFilerGrpcAddress(master)
if err != nil {
glog.Fatalf("invalid master address %s: %v", master, err)
@@ -134,12 +148,19 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
}
isConnected := false
- var metricsAddress string
- var metricsIntervalSec int
- var readErr error
for !isConnected {
- for _, master := range option.Masters {
- metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master)
+ for _, master := range fs.option.Masters {
+ readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ fs.metricsAddress, fs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ if fs.option.DefaultReplication == "" {
+ fs.option.DefaultReplication = resp.DefaultReplication
+ }
+ return nil
+ })
if readErr == nil {
isConnected = true
} else {
@@ -147,23 +168,5 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) {
}
}
}
- if metricsAddress == "" && metricsIntervalSec <= 0 {
- return
- }
- go stats.LoopPushingMetric("filer", stats.SourceName(option.Port), stats.FilerGather,
- func() (addr string, intervalSeconds int) {
- return metricsAddress, metricsIntervalSec
- })
-}
-func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) {
- err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
- resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
- if err != nil {
- return fmt.Errorf("get master %s configuration: %v", masterAddress, err)
- }
- metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
- return nil
- })
- return
}
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index b6bfc3b04..18f78881c 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"time"
@@ -8,6 +9,7 @@ import (
)
func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
start := time.Now()
switch r.Method {
case "GET":
@@ -34,6 +36,7 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
}
func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
start := time.Now()
switch r.Method {
case "GET":
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 76c924df1..731bd3545 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -11,7 +11,7 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -93,29 +93,42 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
}
+ //set tag count
+ if r.Method == "GET" {
+ tagCount := 0
+ for k, _ := range entry.Extended {
+ if strings.HasPrefix(k, "x-amz-tagging-") {
+ tagCount++
+ }
+ }
+ if tagCount > 0 {
+ w.Header().Set("x-amz-tag-count", strconv.Itoa(tagCount))
+ }
+ }
+
// set etag
- etag := filer2.ETagEntry(entry)
+ etag := filer.ETagEntry(entry)
if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" {
w.WriteHeader(http.StatusNotModified)
return
}
setEtag(w, etag)
+ filename := entry.Name()
+ adjustHeaderContentDisposition(w, r, filename)
+
if r.Method == "HEAD" {
- w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10))
+ w.Header().Set("Content-Length", strconv.FormatInt(int64(entry.Size()), 10))
return
}
- filename := entry.Name()
- adjustHeadersAfterHEAD(w, r, filename)
-
- totalSize := int64(filer2.TotalSize(entry.Chunks))
+ totalSize := int64(entry.Size())
if rangeReq := r.Header.Get("Range"); rangeReq == "" {
ext := filepath.Ext(filename)
width, height, mode, shouldResize := shouldResizeImages(ext, r)
if shouldResize {
- data, err := filer2.ReadAll(fs.filer.MasterClient, entry.Chunks)
+ data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks)
if err != nil {
glog.Errorf("failed to read %s: %v", path, err)
w.WriteHeader(http.StatusNotModified)
@@ -128,7 +141,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request,
}
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
- return filer2.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
+ return filer.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size)
})
}
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index ae28fc1db..99345550c 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -2,6 +2,9 @@ package weed_server
import (
"context"
+ "encoding/base64"
+ "fmt"
+ "github.com/skip2/go-qrcode"
"net/http"
"strconv"
"strings"
@@ -32,7 +35,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName := r.FormValue("lastFileName")
- entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit)
+ entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit, "")
if err != nil {
glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err)
@@ -65,21 +68,30 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
lastFileName,
shouldDisplayLoadMore,
})
- } else {
- ui.StatusTpl.Execute(w, struct {
- Path string
- Breadcrumbs []ui.Breadcrumb
- Entries interface{}
- Limit int
- LastFileName string
- ShouldDisplayLoadMore bool
- }{
- path,
- ui.ToBreadcrumb(path),
- entries,
- limit,
- lastFileName,
- shouldDisplayLoadMore,
- })
+ return
+ }
+
+ var qrImageString string
+ img, err := qrcode.Encode(fmt.Sprintf("http://%s:%d%s", fs.option.Host, fs.option.Port, r.URL.Path), qrcode.Medium, 128)
+ if err == nil {
+ qrImageString = base64.StdEncoding.EncodeToString(img)
}
+
+ ui.StatusTpl.Execute(w, struct {
+ Path string
+ Breadcrumbs []ui.Breadcrumb
+ Entries interface{}
+ Limit int
+ LastFileName string
+ ShouldDisplayLoadMore bool
+ QrImage string
+ }{
+ path,
+ ui.ToBreadcrumb(path),
+ entries,
+ limit,
+ lastFileName,
+ shouldDisplayLoadMore,
+ qrImageString,
+ })
}
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 74a558e22..267b8752d 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -2,22 +2,11 @@ package weed_server
import (
"context"
- "crypto/md5"
- "encoding/json"
- "errors"
- "fmt"
- "io"
- "io/ioutil"
- "mime"
"net/http"
- "net/url"
"os"
- filenamePath "path"
- "strconv"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -40,7 +29,7 @@ type FilerPostResult struct {
Url string `json:"url,omitempty"`
}
-func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
+func (fs *FilerServer) assignNewFileInfo(replication, collection, dataCenter, rack, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) {
stats.FilerRequestCounter.WithLabelValues("assign").Inc()
start := time.Now()
@@ -54,20 +43,20 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request,
DataCenter: dataCenter,
}
var altRequest *operation.VolumeAssignRequest
- if dataCenter != "" {
+ if dataCenter != "" || rack != "" {
altRequest = &operation.VolumeAssignRequest{
Count: 1,
Replication: replication,
Collection: collection,
Ttl: ttlString,
DataCenter: "",
+ Rack: "",
}
}
assignResult, ae := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, ar, altRequest)
if ae != nil {
glog.Errorf("failing to assign a file id: %v", ae)
- writeJsonError(w, r, http.StatusInternalServerError, ae)
err = ae
return
}
@@ -90,6 +79,10 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
if dataCenter == "" {
dataCenter = fs.option.DataCenter
}
+ rack := query.Get("rack")
+ if dataCenter == "" {
+ rack = fs.option.Rack
+ }
ttlString := r.URL.Query().Get("ttl")
// read ttl in seconds
@@ -99,206 +92,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ttlSeconds = int32(ttl.Minutes()) * 60
}
- if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked {
- return
- }
-
- if fs.option.Cipher {
- reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync)
- if err != nil {
- writeJsonError(w, r, http.StatusInternalServerError, err)
- } else if reply != nil {
- writeJsonQuiet(w, r, http.StatusCreated, reply)
- }
-
- return
- }
-
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
-
- if err != nil || fileId == "" || urlLocation == "" {
- glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
- writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter))
- return
- }
-
- glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation)
-
- u, _ := url.Parse(urlLocation)
- ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId)
- if err != nil {
- return
- }
-
- if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil {
- return
- }
-
- // send back post result
- reply := FilerPostResult{
- Name: ret.Name,
- Size: int64(ret.Size),
- Error: ret.Error,
- Fid: fileId,
- Url: urlLocation,
- }
- setEtag(w, ret.ETag)
- writeJsonQuiet(w, r, http.StatusCreated, reply)
-}
-
-// update metadata in filer store
-func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string,
- collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) {
-
- stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postStoreWrite").Observe(time.Since(start).Seconds())
- }()
-
- modeStr := r.URL.Query().Get("mode")
- if modeStr == "" {
- modeStr = "0660"
- }
- mode, err := strconv.ParseUint(modeStr, 8, 32)
- if err != nil {
- glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
- mode = 0660
- }
-
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if ret.Name != "" {
- path += ret.Name
- }
- }
- existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
- crTime := time.Now()
- if err == nil && existingEntry != nil {
- crTime = existingEntry.Crtime
- }
- entry := &filer2.Entry{
- FullPath: util.FullPath(path),
- Attr: filer2.Attr{
- Mtime: time.Now(),
- Crtime: crTime,
- Mode: os.FileMode(mode),
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: replication,
- Collection: collection,
- TtlSec: ttlSeconds,
- Mime: ret.Mime,
- Md5: md5value,
- },
- Chunks: []*filer_pb.FileChunk{{
- FileId: fileId,
- Size: uint64(ret.Size),
- Mtime: time.Now().UnixNano(),
- ETag: ret.ETag,
- }},
- }
- if entry.Attr.Mime == "" {
- if ext := filenamePath.Ext(path); ext != "" {
- entry.Attr.Mime = mime.TypeByExtension(ext)
- }
- }
- // glog.V(4).Infof("saving %s => %+v", path, entry)
- if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil {
- fs.filer.DeleteChunks(entry.Chunks)
- glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
- writeJsonError(w, r, http.StatusInternalServerError, dbErr)
- err = dbErr
- return
- }
-
- return nil
-}
-
-// send request to volume server
-func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) {
-
- stats.FilerRequestCounter.WithLabelValues("postUpload").Inc()
- start := time.Now()
- defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }()
+ fs.autoChunk(ctx, w, r, replication, collection, dataCenter, rack, ttlSeconds, ttlString, fsync)
- ret = &operation.UploadResult{}
-
- md5Hash := md5.New()
- body := r.Body
- if r.Method == "PUT" {
- // only PUT or large chunked files has Md5 in attributes
- body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash))
- }
-
- request := &http.Request{
- Method: r.Method,
- URL: u,
- Proto: r.Proto,
- ProtoMajor: r.ProtoMajor,
- ProtoMinor: r.ProtoMinor,
- Header: r.Header,
- Body: body,
- Host: r.Host,
- ContentLength: r.ContentLength,
- }
-
- if auth != "" {
- request.Header.Set("Authorization", "BEARER "+string(auth))
- }
- resp, doErr := util.Do(request)
- if doErr != nil {
- glog.Errorf("failing to connect to volume server %s: %v, %+v", r.RequestURI, doErr, r.Method)
- writeJsonError(w, r, http.StatusInternalServerError, doErr)
- err = doErr
- return
- }
- defer func() {
- io.Copy(ioutil.Discard, resp.Body)
- resp.Body.Close()
- }()
-
- respBody, raErr := ioutil.ReadAll(resp.Body)
- if raErr != nil {
- glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error())
- writeJsonError(w, r, http.StatusInternalServerError, raErr)
- err = raErr
- return
- }
-
- glog.V(4).Infoln("post result", string(respBody))
- unmarshalErr := json.Unmarshal(respBody, &ret)
- if unmarshalErr != nil {
- glog.V(0).Infoln("failing to read upload resonse", r.RequestURI, string(respBody))
- writeJsonError(w, r, http.StatusInternalServerError, unmarshalErr)
- err = unmarshalErr
- return
- }
- if ret.Error != "" {
- err = errors.New(ret.Error)
- glog.V(0).Infoln("failing to post to volume server", r.RequestURI, ret.Error)
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
- }
- // find correct final path
- path := r.URL.Path
- if strings.HasSuffix(path, "/") {
- if ret.Name != "" {
- path += ret.Name
- } else {
- err = fmt.Errorf("can not to write to folder %s without a file name", path)
- fs.filer.DeleteFileByFileId(fileId)
- glog.V(0).Infoln("Can not to write to folder", path, "without a file name!")
- writeJsonError(w, r, http.StatusInternalServerError, err)
- return
- }
- }
- // use filer calculated md5 ETag, instead of the volume server crc ETag
- if r.Method == "PUT" {
- md5value = md5Hash.Sum(nil)
- }
- ret.ETag = getEtag(resp)
- return
}
// curl -X DELETE http://localhost:8888/path/to
@@ -316,9 +111,14 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true"
skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true"
- err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion)
+ objectPath := r.URL.Path
+ if len(r.URL.Path) > 1 && strings.HasSuffix(objectPath, "/") {
+ objectPath = objectPath[0 : len(objectPath)-1]
+ }
+
+ err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(objectPath), isRecursive, ignoreRecursiveError, !skipChunkDeletion, false, nil)
if err != nil {
- glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error())
+ glog.V(1).Infoln("deleting", objectPath, ":", err.Error())
httpStatus := http.StatusInternalServerError
if err == filer_pb.ErrNotFound {
httpStatus = http.StatusNotFound
@@ -344,6 +144,7 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st
}
// required by buckets folder
+ bucketDefaultReplication := ""
if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:]
t := strings.Index(bucketAndObjectKey, "/")
@@ -353,7 +154,10 @@ func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication st
if t > 0 {
collection = bucketAndObjectKey[:t]
}
- replication, fsync = fs.filer.ReadBucketOption(collection)
+ bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(collection)
+ }
+ if replication == "" {
+ replication = bucketDefaultReplication
}
return
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 532693742..d86d49b2a 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -3,15 +3,18 @@ package weed_server
import (
"context"
"crypto/md5"
+ "fmt"
+ "hash"
"io"
"io/ioutil"
"net/http"
+ "os"
"path"
"strconv"
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -20,12 +23,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool {
- if r.Method != "POST" {
- glog.V(4).Infoln("AutoChunking not supported for method", r.Method)
- return false
- }
+func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) {
// autoChunking can be set at the command-line level or as a query param. Query param overrides command-line
query := r.URL.Query()
@@ -35,54 +33,47 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
if maxMB <= 0 && fs.option.MaxMB > 0 {
maxMB = int32(fs.option.MaxMB)
}
- if maxMB <= 0 {
- glog.V(4).Infoln("AutoChunking not enabled")
- return false
- }
- glog.V(4).Infoln("AutoChunking level set to", maxMB, "(MB)")
chunkSize := 1024 * 1024 * maxMB
- contentLength := int64(0)
- if contentLengthHeader := r.Header["Content-Length"]; len(contentLengthHeader) == 1 {
- contentLength, _ = strconv.ParseInt(contentLengthHeader[0], 10, 64)
- if contentLength <= int64(chunkSize) {
- glog.V(4).Infoln("Content-Length of", contentLength, "is less than the chunk size of", chunkSize, "so autoChunking will be skipped.")
- return false
- }
- }
+ stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
+ start := time.Now()
+ defer func() {
+ stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
+ }()
- if contentLength <= 0 {
- glog.V(4).Infoln("Content-Length value is missing or unexpected so autoChunking will be skipped.")
- return false
+ var reply *FilerPostResult
+ var err error
+ var md5bytes []byte
+ if r.Method == "POST" {
+ if r.Header.Get("Content-Type") == "" && strings.HasSuffix(r.URL.Path, "/") {
+ reply, err = fs.mkdir(ctx, w, r)
+ } else {
+ reply, md5bytes, err = fs.doPostAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, rack, ttlSec, ttlString, fsync)
+ }
+ } else {
+ reply, md5bytes, err = fs.doPutAutoChunk(ctx, w, r, chunkSize, replication, collection, dataCenter, rack, ttlSec, ttlString, fsync)
}
-
- reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else if reply != nil {
+ if len(md5bytes) > 0 {
+ w.Header().Set("Content-MD5", util.Base64Encode(md5bytes))
+ }
writeJsonQuiet(w, r, http.StatusCreated, reply)
}
- return true
}
-func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request,
- contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) {
-
- stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc()
- start := time.Now()
- defer func() {
- stats.FilerRequestHistogram.WithLabelValues("postAutoChunk").Observe(time.Since(start).Seconds())
- }()
+func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
multipartReader, multipartReaderErr := r.MultipartReader()
if multipartReaderErr != nil {
- return nil, multipartReaderErr
+ return nil, nil, multipartReaderErr
}
part1, part1Err := multipartReader.NextPart()
if part1Err != nil {
- return nil, part1Err
+ return nil, nil, part1Err
}
fileName := part1.FileName()
@@ -90,48 +81,63 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
fileName = path.Base(fileName)
}
contentType := part1.Header.Get("Content-Type")
+ if contentType == "application/octet-stream" {
+ contentType = ""
+ }
- var fileChunks []*filer_pb.FileChunk
+ fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, part1, chunkSize, replication, collection, dataCenter, rack, ttlString, fileName, contentType, fsync)
+ if err != nil {
+ return nil, nil, err
+ }
- md5Hash := md5.New()
- var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash))
+ fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, rack, ttlString, fsync), fileChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
+ }
- chunkOffset := int64(0)
+ md5bytes = md5Hash.Sum(nil)
+ filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset)
- for chunkOffset < contentLength {
- limitedReader := io.LimitReader(partReader, int64(chunkSize))
+ return
+}
- // assign one file id for one chunk
- fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
- if assignErr != nil {
- return nil, assignErr
- }
+func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, md5bytes []byte, replyerr error) {
- // upload the chunk to the volume server
- uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
- if uploadErr != nil {
- return nil, uploadErr
- }
+ fileName := ""
+ contentType := ""
- // if last chunk exhausted the reader exactly at the border
- if uploadResult.Size == 0 {
- break
- }
+ fileChunks, md5Hash, chunkOffset, err := fs.uploadReaderToChunks(w, r, r.Body, chunkSize, replication, collection, dataCenter, rack, ttlString, fileName, contentType, fsync)
+ if err != nil {
+ return nil, nil, err
+ }
- // Save to chunk manifest structure
- fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
+ fileChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(replication, collection, dataCenter, rack, ttlString, fsync), fileChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("manifestize %s: %v", r.RequestURI, replyerr)
+ return
+ }
- glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength)
+ md5bytes = md5Hash.Sum(nil)
+ filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, replication, collection, ttlSec, contentType, md5bytes, fileChunks, chunkOffset)
- // reset variables for the next chunk
- chunkOffset = chunkOffset + int64(uploadResult.Size)
+ return
+}
- // if last chunk was not at full chunk size, but already exhausted the reader
- if int64(uploadResult.Size) < int64(chunkSize) {
- break
- }
+func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, replication string, collection string, ttlSec int32, contentType string, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64) (filerResult *FilerPostResult, replyerr error) {
+
+ // detect file mode
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
+ }
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
}
+ // fix the path
path := r.URL.Path
if strings.HasSuffix(path, "/") {
if fileName != "" {
@@ -139,20 +145,28 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
}
}
+ // fix the crTime
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ crTime := time.Now()
+ if err == nil && existingEntry != nil {
+ crTime = existingEntry.Crtime
+ }
+
glog.V(4).Infoln("saving", path)
- entry := &filer2.Entry{
+ entry := &filer.Entry{
FullPath: util.FullPath(path),
- Attr: filer2.Attr{
+ Attr: filer.Attr{
Mtime: time.Now(),
- Crtime: time.Now(),
- Mode: 0660,
+ Crtime: crTime,
+ Mode: os.FileMode(mode),
Uid: OS_UID,
Gid: OS_GID,
Replication: replication,
Collection: collection,
TtlSec: ttlSec,
Mime: contentType,
- Md5: md5Hash.Sum(nil),
+ Md5: md5bytes,
+ FileSize: uint64(chunkOffset),
},
Chunks: fileChunks,
}
@@ -162,15 +176,57 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r
Size: chunkOffset,
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
fs.filer.DeleteChunks(entry.Chunks)
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
- return
}
+ return filerResult, replyerr
+}
- return
+func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, replication string, collection string, dataCenter string, rack string, ttlString string, fileName string, contentType string, fsync bool) ([]*filer_pb.FileChunk, hash.Hash, int64, error) {
+ var fileChunks []*filer_pb.FileChunk
+
+ md5Hash := md5.New()
+ var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
+
+ chunkOffset := int64(0)
+
+ for {
+ limitedReader := io.LimitReader(partReader, int64(chunkSize))
+
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync)
+ if assignErr != nil {
+ return nil, nil, 0, assignErr
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth)
+ if uploadErr != nil {
+ return nil, nil, 0, uploadErr
+ }
+
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
+ break
+ }
+
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
+
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
+
+ // reset variables for the next chunk
+ chunkOffset = chunkOffset + int64(uploadResult.Size)
+
+ // if last chunk was not at full chunk size, but already exhausted the reader
+ if int64(uploadResult.Size) < int64(chunkSize) {
+ break
+ }
+ }
+ return fileChunks, md5Hash, chunkOffset, nil
}
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) {
@@ -184,3 +240,71 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht
uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
return uploadResult, err
}
+
+func (fs *FilerServer) saveAsChunk(replication string, collection string, dataCenter string, rack string, ttlString string, fsync bool) filer.SaveDataAsChunkFunctionType {
+
+ return func(reader io.Reader, name string, offset int64) (*filer_pb.FileChunk, string, string, error) {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync)
+ if assignErr != nil {
+ return nil, "", "", assignErr
+ }
+
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth)
+ if uploadErr != nil {
+ return nil, "", "", uploadErr
+ }
+
+ return uploadResult.ToPbFileChunk(fileId, offset), collection, replication, nil
+ }
+}
+
+func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http.Request) (filerResult *FilerPostResult, replyerr error) {
+
+ // detect file mode
+ modeStr := r.URL.Query().Get("mode")
+ if modeStr == "" {
+ modeStr = "0660"
+ }
+ mode, err := strconv.ParseUint(modeStr, 8, 32)
+ if err != nil {
+ glog.Errorf("Invalid mode format: %s, use 0660 by default", modeStr)
+ mode = 0660
+ }
+
+ // fix the path
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ path = path[:len(path)-1]
+ }
+
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if err == nil && existingEntry != nil {
+ replyerr = fmt.Errorf("dir %s already exists", path)
+ return
+ }
+
+ glog.V(4).Infoln("mkdir", path)
+ entry := &filer.Entry{
+ FullPath: util.FullPath(path),
+ Attr: filer.Attr{
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode) | os.ModeDir,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+
+ filerResult = &FilerPostResult{
+ Name: util.FullPath(path).Name(),
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ replyerr = dbErr
+ filerResult.Error = dbErr.Error()
+ glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr)
+ }
+ return filerResult, replyerr
+}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index bea72b2c1..720d97027 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -7,7 +7,7 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
@@ -16,10 +16,9 @@ import (
)
// handling single chunk POST or PUT upload
-func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request,
- replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) {
+func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, replication string, collection string, dataCenter string, rack string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) {
- fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync)
+ fileId, urlLocation, auth, err := fs.assignNewFileInfo(replication, collection, dataCenter, rack, ttlString, fsync)
if err != nil || fileId == "" || urlLocation == "" {
return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)
@@ -38,6 +37,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
}
if pu.MimeType == "" {
pu.MimeType = http.DetectContentType(uncompressedData)
+ // println("detect2 mimetype to", pu.MimeType)
}
uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth)
@@ -57,9 +57,9 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
}
}
- entry := &filer2.Entry{
+ entry := &filer.Entry{
FullPath: util.FullPath(path),
- Attr: filer2.Attr{
+ Attr: filer.Attr{
Mtime: time.Now(),
Crtime: time.Now(),
Mode: 0660,
@@ -69,6 +69,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Collection: collection,
TtlSec: ttlSeconds,
Mime: pu.MimeType,
+ Md5: util.Base64Md5ToBytes(pu.ContentMd5),
},
Chunks: fileChunks,
}
@@ -78,7 +79,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Size: int64(pu.OriginalDataSize),
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
fs.filer.DeleteChunks(entry.Chunks)
err = dbErr
filerResult.Error = dbErr.Error()
diff --git a/weed/server/filer_ui/templates.go b/weed/server/filer_ui/templates.go
index e532b27e2..f86dde5b1 100644
--- a/weed/server/filer_ui/templates.go
+++ b/weed/server/filer_ui/templates.go
@@ -3,18 +3,29 @@ package master_ui
import (
"github.com/dustin/go-humanize"
"html/template"
+ "net/url"
+ "strings"
)
+func printpath(parts ...string) string {
+ concat := strings.Join(parts, "")
+ escaped := url.PathEscape(concat)
+ return strings.ReplaceAll(escaped, "%2F", "/")
+}
+
var funcMap = template.FuncMap{
"humanizeBytes": humanize.Bytes,
+ "printpath": printpath,
}
var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html>
<html>
<head>
- <title>SeaweedFS Filer</title>
- <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
+ <title>SeaweedFS Filer</title>
+ <meta name="viewport" content="width=device-width, initial-scale=1">
+ <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
<style>
+body { padding-bottom: 70px; }
#drop-area {
border: 1px transparent;
}
@@ -37,6 +48,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
#fileElem {
display: none;
}
+.qrImage {
+ display: block;
+ margin-left: auto;
+ margin-right: auto;
+}
</style>
</head>
<body>
@@ -50,7 +66,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<div class="row">
<div>
{{ range $entry := .Breadcrumbs }}
- <a href="{{ $entry.Link }}" >
+ <a href="{{ printpath $entry.Link }}" >
{{ $entry.Name }}
</a>
{{ end }}
@@ -69,11 +85,11 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
<td>
{{if $entry.IsDirectory}}
<img src="/seaweedfsstatic/images/folder.gif" width="20" height="23">
- <a href={{ print $path "/" $entry.Name "/"}} >
+ <a href="{{ printpath $path "/" $entry.Name "/"}}" >
{{ $entry.Name }}
</a>
{{else}}
- <a href={{ print $path "/" $entry.Name }} >
+ <a href="{{ printpath $path "/" $entry.Name }}" >
{{ $entry.Name }}
</a>
{{end}}
@@ -107,6 +123,14 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC
</a>
</div>
{{end}}
+
+ <br/>
+ <br/>
+
+ <div class="navbar navbar-fixed-bottom">
+ <img src="data:image/png;base64,{{.QrImage}}" class="qrImage" />
+ </div>
+
</div>
</body>
<script type="text/javascript">
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 1ee214deb..e8fa3995d 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"net"
"strings"
"time"
@@ -12,21 +13,19 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
)
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
var dn *topology.DataNode
- t := ms.Topo
defer func() {
if dn != nil {
// if the volume server disconnects and reconnects quickly
// the unregister and register can race with each other
- t.UnRegisterDataNode(dn)
+ ms.Topo.UnRegisterDataNode(dn)
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
message := &master_pb.VolumeLocation{
@@ -62,21 +61,18 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
- t.Sequence.SetMax(heartbeat.MaxFileKey)
+ ms.Topo.Sequence.SetMax(heartbeat.MaxFileKey)
if dn == nil {
- dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
- dc := t.GetOrCreateDataCenter(dcName)
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip,
int(heartbeat.Port), heartbeat.PublicUrl,
int64(heartbeat.MaxVolumeCount))
glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
- VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
- StorageBackends: backend.ToPbStorageBackends(),
+ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
@@ -102,12 +98,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
message.DeletedVids = append(message.DeletedVids, volInfo.Id)
}
// update master internal volume layouts
- t.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
+ ms.Topo.IncrementalSyncDataNodeRegistration(heartbeat.NewVolumes, heartbeat.DeletedVolumes, dn)
}
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
// process heartbeat.Volumes
- newVolumes, deletedVolumes := t.SyncDataNodeRegistration(heartbeat.Volumes, dn)
+ newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
for _, v := range newVolumes {
glog.V(0).Infof("master see new volume %d from %s", uint32(v.Id), dn.Url())
@@ -122,7 +118,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
// update master internal volume layouts
- t.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
+ ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
for _, s := range heartbeat.NewEcShards {
message.NewVids = append(message.NewVids, s.Id)
@@ -137,8 +133,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
- glog.V(1).Infof("master recieved ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
- newShards, deletedShards := t.SyncDataNodeEcShards(heartbeat.EcShards, dn)
+ glog.V(1).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
// broadcast the ec vid changes to master clients
for _, s := range newShards {
@@ -163,7 +159,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
// tell the volume servers about the leader
- newLeader, err := t.Leader()
+ newLeader, err := ms.Topo.Leader()
if err != nil {
glog.Warningf("SendHeartbeat find leader: %v", err)
return err
@@ -192,7 +188,8 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
peerAddress := findClientAddress(stream.Context(), req.GrpcPort)
- stopChan := make(chan bool)
+ // buffer by 1 so we don't end up getting stuck writing to stopChan forever
+ stopChan := make(chan bool, 1)
clientName, messageChan := ms.addClient(req.Name, peerAddress)
@@ -252,7 +249,12 @@ func (ms *MasterServer) addClient(clientType string, clientAddress string) (clie
clientName = clientType + "@" + clientAddress
glog.V(0).Infof("+ client %v", clientName)
- messageChan = make(chan *master_pb.VolumeLocation)
+ // we buffer this because otherwise we end up in a potential deadlock where
+ // the KeepConnected loop is no longer listening on this channel but we're
+ // trying to send to it in SendHeartbeat and so we can't lock the
+ // clientChansLock to remove the channel and we're stuck writing to it
+ // 100 is probably overkill
+ messageChan = make(chan *master_pb.VolumeLocation, 100)
ms.clientChansLock.Lock()
ms.clientChans[clientName] = messageChan
@@ -301,3 +303,19 @@ func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.Li
}
return resp, nil
}
+
+func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
+
+ // tell the volume servers about the leader
+ leader, _ := ms.Topo.Leader()
+
+ resp := &master_pb.GetMasterConfigurationResponse{
+ MetricsAddress: ms.option.MetricsAddress,
+ MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
+ StorageBackends: backend.ToPbStorageBackends(),
+ DefaultReplication: ms.option.DefaultReplicaPlacement,
+ Leader: leader,
+ }
+
+ return resp, nil
+}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 282c75679..03b718291 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -3,7 +3,6 @@ package weed_server
import (
"context"
"fmt"
-
"github.com/chrislusf/raft"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
@@ -178,13 +177,3 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku
return resp, nil
}
-
-func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
-
- resp := &master_pb.GetMasterConfigurationResponse{
- MetricsAddress: ms.option.MetricsAddress,
- MetricsIntervalSeconds: uint32(ms.option.MetricsIntervalSec),
- }
-
- return resp, nil
-}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index 9a490bb1f..cc1c4b2ad 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -7,7 +7,6 @@ import (
"net/url"
"os"
"regexp"
- "strconv"
"strings"
"sync"
"time"
@@ -32,11 +31,11 @@ const (
)
type MasterOption struct {
- Host string
- Port int
- MetaFolder string
- VolumeSizeLimitMB uint
- VolumePreallocate bool
+ Host string
+ Port int
+ MetaFolder string
+ VolumeSizeLimitMB uint
+ VolumePreallocate bool
// PulseSeconds int
DefaultReplicaPlacement string
GarbageThreshold float64
@@ -66,7 +65,7 @@ type MasterServer struct {
MasterClient *wdclient.MasterClient
- adminLocks *AdminLocks
+ adminLocks *AdminLocks
}
func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
@@ -139,14 +138,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
ms.Topo.RaftServer = raftServer.raftServer
ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("event: %+v", e)
+ glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
if ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
}
})
- ms.Topo.RaftServer.AddEventListener(raft.StateChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("state change: %+v", e)
- })
if ms.Topo.IsLeader() {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
} else {
@@ -210,7 +206,7 @@ func (ms *MasterServer) startAdminScripts() {
scriptLines = append(scriptLines, "unlock")
}
- masterAddress := "localhost:" + strconv.Itoa(ms.option.Port)
+ masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 7595c0171..34235384f 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -110,7 +110,7 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
} else {
url = util.NormalizeUrl(loc.PublicUrl) + r.URL.Path
}
- http.Redirect(w, r, url, http.StatusMovedPermanently)
+ http.Redirect(w, r, url, http.StatusPermanentRedirect)
} else {
writeJsonError(w, r, http.StatusNotFound, fmt.Errorf("volume id %s not found: %s", vid, location.Error))
}
diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go
index 7189064d0..60873f6aa 100644
--- a/weed/server/master_ui/templates.go
+++ b/weed/server/master_ui/templates.go
@@ -88,7 +88,11 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html>
<tr>
<td><code>{{ $dc.Id }}</code></td>
<td>{{ $rack.Id }}</td>
- <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a></td>
+ <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
+ {{ if ne $dn.PublicUrl $dn.Url }}
+ / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
+ {{ end }}
+ </td>
<td>{{ $dn.Volumes }}</td>
<td>{{ $dn.VolumeIds}}</td>
<td>{{ $dn.EcShards }}</td>
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 0381c7feb..85841e409 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,10 +2,9 @@ package weed_server
import (
"encoding/json"
- "io/ioutil"
+ "math/rand"
"os"
"path"
- "reflect"
"sort"
"time"
@@ -28,7 +27,31 @@ type RaftServer struct {
*raft.GrpcServer
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, pulseSeconds int) *RaftServer {
+type StateMachine struct {
+ raft.StateMachine
+ topo *topology.Topology
+}
+
+func (s StateMachine) Save() ([]byte, error) {
+ state := topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }
+ glog.V(1).Infof("Save raft state %+v", state)
+ return json.Marshal(state)
+}
+
+func (s StateMachine) Recovery(data []byte) error {
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(data, &state)
+ if err != nil {
+ return err
+ }
+ glog.V(1).Infof("Recovery raft state %+v", state)
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+ return nil
+}
+
+func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
s := &RaftServer{
peers: peers,
serverAddr: serverAddr,
@@ -46,47 +69,66 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
transporter := raft.NewGrpcTransporter(grpcDialOption)
glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
- // Clear old cluster configurations if peers are changed
- if oldPeers, changed := isPeersChanged(s.dataDir, serverAddr, s.peers); changed {
- glog.V(0).Infof("Peers Change: %v => %v", oldPeers, s.peers)
+ if !raftResumeState {
+ // always clear previous metadata
os.RemoveAll(path.Join(s.dataDir, "conf"))
os.RemoveAll(path.Join(s.dataDir, "log"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
+ return nil, err
+ }
- s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, nil, topo, "")
+ stateMachine := StateMachine{topo: topo}
+ s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
if err != nil {
glog.V(0).Infoln(err)
- return nil
+ return nil, err
+ }
+ s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond)
+ s.raftServer.SetElectionTimeout(10 * time.Second)
+ if err := s.raftServer.LoadSnapshot(); err != nil {
+ return nil, err
+ }
+ if err := s.raftServer.Start(); err != nil {
+ return nil, err
}
- s.raftServer.SetHeartbeatInterval(500 * time.Millisecond)
- s.raftServer.SetElectionTimeout(time.Duration(pulseSeconds) * 500 * time.Millisecond)
- s.raftServer.Start()
for _, peer := range s.peers {
- s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer))
+ if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
+ return nil, err
+ }
+ }
+
+ // Remove deleted peers
+ for existsPeerName := range s.raftServer.Peers() {
+ exists, existingPeer := false, ""
+ for _, peer := range s.peers {
+ if pb.ServerToGrpcAddress(peer) == existsPeerName {
+ exists, existingPeer = true, peer
+ break
+ }
+ }
+ if exists {
+ if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
+ glog.V(0).Infoln(err)
+ return nil, err
+ } else {
+ glog.V(0).Infof("removing old peer %s", existingPeer)
+ }
+ }
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
// Initialize the server by joining itself.
- glog.V(0).Infoln("Initializing new cluster")
-
- _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
- Name: s.raftServer.Name(),
- ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
- })
-
- if err != nil {
- glog.V(0).Infoln(err)
- return nil
- }
+ // s.DoJoinCommand()
}
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
- return s
+ return s, nil
}
func (s *RaftServer) Peers() (members []string) {
@@ -99,34 +141,6 @@ func (s *RaftServer) Peers() (members []string) {
return
}
-func isPeersChanged(dir string, self string, peers []string) (oldPeers []string, changed bool) {
- confPath := path.Join(dir, "conf")
- // open conf file
- b, err := ioutil.ReadFile(confPath)
- if err != nil {
- return oldPeers, true
- }
- conf := &raft.Config{}
- if err = json.Unmarshal(b, conf); err != nil {
- return oldPeers, true
- }
-
- for _, p := range conf.Peers {
- oldPeers = append(oldPeers, p.Name)
- }
- oldPeers = append(oldPeers, self)
-
- if len(peers) == 0 && len(oldPeers) <= 1 {
- return oldPeers, false
- }
-
- sort.Strings(peers)
- sort.Strings(oldPeers)
-
- return oldPeers, !reflect.DeepEqual(peers, oldPeers)
-
-}
-
func isTheFirstOne(self string, peers []string) bool {
sort.Strings(peers)
if len(peers) <= 0 {
@@ -134,3 +148,16 @@ func isTheFirstOne(self string, peers []string) bool {
}
return self == peers[0]
}
+
+func (s *RaftServer) DoJoinCommand() {
+
+ glog.V(0).Infoln("Initializing new cluster")
+
+ if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
+ Name: s.raftServer.Name(),
+ ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
+ }); err != nil {
+ glog.Errorf("fail to send join command: %v", err)
+ }
+
+}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index fd38cb977..252570eab 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -1,20 +1,24 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
"net/http"
)
type ClusterStatusResult struct {
- IsLeader bool `json:"IsLeader,omitempty"`
- Leader string `json:"Leader,omitempty"`
- Peers []string `json:"Peers,omitempty"`
+ IsLeader bool `json:"IsLeader,omitempty"`
+ Leader string `json:"Leader,omitempty"`
+ Peers []string `json:"Peers,omitempty"`
+ MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"`
}
func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
ret := ClusterStatusResult{
- IsLeader: s.topo.IsLeader(),
- Peers: s.Peers(),
+ IsLeader: s.topo.IsLeader(),
+ Peers: s.Peers(),
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
}
+
if leader, e := s.topo.Leader(); e == nil {
ret.Leader = leader
}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 27b21ac09..9296c63e9 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/super_block"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
)
func (vs *VolumeServer) DeleteCollection(ctx context.Context, req *volume_server_pb.DeleteCollectionRequest) (*volume_server_pb.DeleteCollectionResponse, error) {
@@ -148,7 +149,35 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv
}
return resp, err
+}
+
+func (vs *VolumeServer) VolumeMarkWritable(ctx context.Context, req *volume_server_pb.VolumeMarkWritableRequest) (*volume_server_pb.VolumeMarkWritableResponse, error) {
+
+ resp := &volume_server_pb.VolumeMarkWritableResponse{}
+ err := vs.store.MarkVolumeWritable(needle.VolumeId(req.VolumeId))
+
+ if err != nil {
+ glog.Errorf("volume mark writable %v: %v", req, err)
+ } else {
+ glog.V(2).Infof("volume mark writable %v", req)
+ }
+
+ return resp, err
+}
+
+func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.VolumeStatusRequest) (*volume_server_pb.VolumeStatusResponse, error) {
+
+ resp := &volume_server_pb.VolumeStatusResponse{}
+
+ v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
+ if v == nil {
+ return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
+ }
+
+ resp.IsReadOnly = v.IsReadOnly()
+
+ return resp, nil
}
func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
@@ -166,3 +195,54 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv
return resp, nil
}
+
+func (vs *VolumeServer) VolumeServerLeave(ctx context.Context, req *volume_server_pb.VolumeServerLeaveRequest) (*volume_server_pb.VolumeServerLeaveResponse, error) {
+
+ resp := &volume_server_pb.VolumeServerLeaveResponse{}
+
+ vs.StopHeartbeat()
+
+ return resp, nil
+
+}
+
+func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_server_pb.VolumeNeedleStatusRequest) (*volume_server_pb.VolumeNeedleStatusResponse, error) {
+
+ resp := &volume_server_pb.VolumeNeedleStatusResponse{}
+
+ volumeId := needle.VolumeId(req.VolumeId)
+
+ n := &needle.Needle{
+ Id: types.NeedleId(req.NeedleId),
+ }
+
+ var count int
+ var err error
+ hasVolume := vs.store.HasVolume(volumeId)
+ if !hasVolume {
+ _, hasEcVolume := vs.store.FindEcVolume(volumeId)
+ if !hasEcVolume {
+ return nil, fmt.Errorf("volume not found %d", req.VolumeId)
+ }
+ count, err = vs.store.ReadEcShardNeedle(volumeId, n)
+ } else {
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, nil)
+ }
+ if err != nil {
+ return nil, err
+ }
+ if count < 0 {
+ return nil, fmt.Errorf("needle not found %d", n.Id)
+ }
+
+ resp.NeedleId = uint64(n.Id)
+ resp.Cookie = uint32(n.Cookie)
+ resp.Size = uint32(n.Size)
+ resp.LastModified = n.LastModified
+ resp.Crc = n.Checksum.Value()
+ if n.HasTtl() {
+ resp.Ttl = n.Ttl.String()
+ }
+ return resp, nil
+
+}
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index 501964191..8e84dc2a8 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -41,7 +41,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
} else {
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusNotFound,
@@ -79,7 +79,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusAccepted,
- Size: size},
+ Size: uint32(size)},
)
}
}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index 7cb836344..199f8faba 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,7 +2,7 @@ package weed_server
import (
"fmt"
- "net"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"time"
"google.golang.org/grpc"
@@ -22,6 +22,31 @@ import (
func (vs *VolumeServer) GetMaster() string {
return vs.currentMaster
}
+
+func (vs *VolumeServer) checkWithMaster() (err error) {
+ isConnected := false
+ for !isConnected {
+ for _, master := range vs.SeedMasterNodes {
+ err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", master, err)
+ }
+ vs.metricsAddress, vs.metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds)
+ backend.LoadFromPbStorageBackends(resp.StorageBackends)
+ return nil
+ })
+ if err == nil {
+ return
+ } else {
+ glog.V(0).Infof("checkWithMaster %s: %v", master, err)
+ }
+ }
+ time.Sleep(1790 * time.Millisecond)
+ }
+ return
+}
+
func (vs *VolumeServer) heartbeat() {
glog.V(0).Infof("Volume server start with seed master nodes: %v", vs.SeedMasterNodes)
@@ -32,7 +57,7 @@ func (vs *VolumeServer) heartbeat() {
var err error
var newLeader string
- for {
+ for vs.isHeartbeating {
for _, master := range vs.SeedMasterNodes {
if newLeader != "" {
// the new leader may actually is the same master
@@ -53,20 +78,35 @@ func (vs *VolumeServer) heartbeat() {
newLeader = ""
vs.store.MasterAddress = ""
}
+ if !vs.isHeartbeating {
+ break
+ }
}
}
}
+func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
+ if !vs.isHeartbeating {
+ return true
+ }
+ vs.isHeartbeating = false
+ close(vs.stopChan)
+ return false
+}
+
func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
- grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption)
+ ctx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+
+ grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
if err != nil {
return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
}
defer grpcConection.Close()
client := master_pb.NewSeaweedClient(grpcConection)
- stream, err := client.SendHeartbeat(context.Background())
+ stream, err := client.SendHeartbeat(ctx)
if err != nil {
glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
return "", err
@@ -87,23 +127,16 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
if vs.store.MaybeAdjustVolumeMax() {
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
}
}
}
- if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) {
- glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode)
+ if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() {
+ glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
newLeader = in.GetLeader()
doneChan <- nil
return
}
- if in.GetMetricsAddress() != "" && vs.MetricsAddress != in.GetMetricsAddress() {
- vs.MetricsAddress = in.GetMetricsAddress()
- vs.MetricsIntervalSec = int(in.GetMetricsIntervalSeconds())
- }
- if len(in.StorageBackends) > 0 {
- backend.LoadFromPbStorageBackends(in.StorageBackends)
- }
}
}()
@@ -182,19 +215,8 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
case err = <-doneChan:
return
+ case <-vs.stopChan:
+ return
}
}
}
-
-func isSameIP(ip string, host string) bool {
- ips, err := net.LookupIP(host)
- if err != nil {
- return false
- }
- for _, t := range ips {
- if ip == t.String() {
- return true
- }
- }
- return false
-}
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 5c7d5572c..17372eef4 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"io"
+ "io/ioutil"
"math"
"os"
"time"
@@ -27,17 +28,12 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId)
- err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId))
- if err != nil {
- return nil, fmt.Errorf("failed to mount existing volume %d: %v", req.VolumeId, err)
- }
-
- err = vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
+ err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
if err != nil {
return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
}
- glog.V(0).Infof("deleted exisitng volume %d before copying.", req.VolumeId)
+ glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
}
location := vs.store.FindFreeLocation()
@@ -65,13 +61,14 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
volumeFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
+ ioutil.WriteFile(volumeFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
+
// println("source:", volFileInfoResp.String())
- // copy ecx file
- if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
return err
}
- if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil {
+ if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil {
return err
}
@@ -79,6 +76,8 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
return err
}
+ os.Remove(volumeFileName+".note")
+
return nil
})
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 66dd5bf8d..55e0261c8 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -38,6 +38,8 @@ Steps to apply erasure coding to .dat .idx files
// VolumeEcShardsGenerate generates the .ecx and .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_server_pb.VolumeEcShardsGenerateRequest) (*volume_server_pb.VolumeEcShardsGenerateResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsGenerate: %v", req)
+
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v == nil {
return nil, fmt.Errorf("volume %d not found", req.VolumeId)
@@ -48,16 +50,16 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
- // write .ecx file
- if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
- return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
- }
-
// write .ec00 ~ .ec13 files
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
+ // write .ecx file
+ if err := erasure_coding.WriteSortedFileFromIdx(baseFileName, ".ecx"); err != nil {
+ return nil, fmt.Errorf("WriteSortedFileFromIdx %s: %v", baseFileName, err)
+ }
+
// write .vif files
if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
@@ -69,6 +71,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
// VolumeEcShardsRebuild generates the any of the missing .ec00 ~ .ec13 files
func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_server_pb.VolumeEcShardsRebuildRequest) (*volume_server_pb.VolumeEcShardsRebuildResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsRebuild: %v", req)
+
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId))
var rebuiltShardIds []uint32
@@ -99,6 +103,8 @@ func (vs *VolumeServer) VolumeEcShardsRebuild(ctx context.Context, req *volume_s
// VolumeEcShardsCopy copy the .ecx and some ec data slices
func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_server_pb.VolumeEcShardsCopyRequest) (*volume_server_pb.VolumeEcShardsCopyResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsCopy: %v", req)
+
location := vs.store.FindFreeLocation()
if location == nil {
return nil, fmt.Errorf("no space left")
@@ -201,9 +207,7 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
if err := os.Remove(baseFilename + ".ecx"); err != nil {
return nil, err
}
- if err := os.Remove(baseFilename + ".ecj"); err != nil {
- return nil, err
- }
+ os.Remove(baseFilename + ".ecj")
}
if !hasIdxFile {
// .vif is used for ec volumes and normal volumes
@@ -215,6 +219,8 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_server_pb.VolumeEcShardsMountRequest) (*volume_server_pb.VolumeEcShardsMountResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsMount: %v", req)
+
for _, shardId := range req.ShardIds {
err := vs.store.MountEcShards(req.Collection, needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
@@ -234,6 +240,8 @@ func (vs *VolumeServer) VolumeEcShardsMount(ctx context.Context, req *volume_ser
func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_server_pb.VolumeEcShardsUnmountRequest) (*volume_server_pb.VolumeEcShardsUnmountResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsUnmount: %v", req)
+
for _, shardId := range req.ShardIds {
err := vs.store.UnmountEcShards(needle.VolumeId(req.VolumeId), erasure_coding.ShardId(shardId))
@@ -264,7 +272,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
if req.FileKey != 0 {
_, size, _ := ecVolume.FindNeedleFromEcx(types.Uint64ToNeedleId(req.FileKey))
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return stream.Send(&volume_server_pb.VolumeEcShardReadResponse{
IsDeleted: true,
})
@@ -321,6 +329,8 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_server_pb.VolumeEcBlobDeleteRequest) (*volume_server_pb.VolumeEcBlobDeleteResponse, error) {
+ glog.V(0).Infof("VolumeEcBlobDelete: %v", req)
+
resp := &volume_server_pb.VolumeEcBlobDeleteResponse{}
for _, location := range vs.store.Locations {
@@ -330,7 +340,7 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
if err != nil {
return nil, fmt.Errorf("locate in local ec volume: %v", err)
}
- if size == types.TombstoneFileSize {
+ if size.IsDeleted() {
return resp, nil
}
@@ -349,6 +359,8 @@ func (vs *VolumeServer) VolumeEcBlobDelete(ctx context.Context, req *volume_serv
// VolumeEcShardsToVolume generates the .idx, .dat files from .ecx, .ecj and .ec01 ~ .ec14 files
func (vs *VolumeServer) VolumeEcShardsToVolume(ctx context.Context, req *volume_server_pb.VolumeEcShardsToVolumeRequest) (*volume_server_pb.VolumeEcShardsToVolumeResponse, error) {
+ glog.V(0).Infof("VolumeEcShardsToVolume: %v", req)
+
v, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
if !found {
return nil, fmt.Errorf("ec volume %d not found", req.VolumeId)
diff --git a/weed/server/volume_grpc_file.go b/weed/server/volume_grpc_file.go
deleted file mode 100644
index 4d71ddeb1..000000000
--- a/weed/server/volume_grpc_file.go
+++ /dev/null
@@ -1,129 +0,0 @@
-package weed_server
-
-import (
- "encoding/json"
- "net/http"
- "strings"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/util"
-)
-
-func (vs *VolumeServer) FileGet(req *volume_server_pb.FileGetRequest, stream volume_server_pb.VolumeServer_FileGetServer) error {
-
- headResponse := &volume_server_pb.FileGetResponse{}
- n := new(needle.Needle)
-
- commaIndex := strings.LastIndex(req.FileId, ",")
- vid := req.FileId[:commaIndex]
- fid := req.FileId[commaIndex+1:]
-
- volumeId, err := needle.NewVolumeId(vid)
- if err != nil {
- headResponse.ErrorCode = http.StatusBadRequest
- return stream.Send(headResponse)
- }
- err = n.ParsePath(fid)
- if err != nil {
- headResponse.ErrorCode = http.StatusBadRequest
- return stream.Send(headResponse)
- }
-
- hasVolume := vs.store.HasVolume(volumeId)
- _, hasEcVolume := vs.store.FindEcVolume(volumeId)
-
- if !hasVolume && !hasEcVolume {
- headResponse.ErrorCode = http.StatusMovedPermanently
- return stream.Send(headResponse)
- }
-
- cookie := n.Cookie
- var count int
- if hasVolume {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n)
- } else if hasEcVolume {
- count, err = vs.store.ReadEcShardNeedle(volumeId, n)
- }
-
- if err != nil || count < 0 {
- headResponse.ErrorCode = http.StatusNotFound
- return stream.Send(headResponse)
- }
- if n.Cookie != cookie {
- headResponse.ErrorCode = http.StatusNotFound
- return stream.Send(headResponse)
- }
-
- if n.LastModified != 0 {
- headResponse.LastModified = n.LastModified
- }
-
- headResponse.Etag = n.Etag()
-
- if n.HasPairs() {
- pairMap := make(map[string]string)
- err = json.Unmarshal(n.Pairs, &pairMap)
- if err != nil {
- glog.V(0).Infoln("Unmarshal pairs error:", err)
- }
- headResponse.Headers = pairMap
- }
-
- /*
- // skip this, no redirection
- if vs.tryHandleChunkedFile(n, filename, w, r) {
- return
- }
- */
-
- if n.NameSize > 0 {
- headResponse.Filename = string(n.Name)
- }
- mtype := ""
- if n.MimeSize > 0 {
- mt := string(n.Mime)
- if !strings.HasPrefix(mt, "application/octet-stream") {
- mtype = mt
- }
- }
- headResponse.ContentType = mtype
-
- headResponse.IsGzipped = n.IsGzipped()
-
- if n.IsGzipped() && req.AcceptGzip {
- if n.Data, err = util.UnGzipData(n.Data); err != nil {
- glog.V(0).Infof("ungzip %s error: %v", req.FileId, err)
- }
- }
-
- headResponse.ContentLength = uint32(len(n.Data))
- bytesToRead := len(n.Data)
- bytesRead := 0
-
- t := headResponse
-
- for bytesRead < bytesToRead {
-
- stopIndex := bytesRead + BufferSizeLimit
- if stopIndex > bytesToRead {
- stopIndex = bytesToRead
- }
-
- if t == nil {
- t = &volume_server_pb.FileGetResponse{}
- }
- t.Data = n.Data[bytesRead:stopIndex]
-
- err = stream.Send(t)
- t = nil
- if err != nil {
- return err
- }
-
- bytesRead = stopIndex
- }
-
- return nil
-}
diff --git a/weed/server/volume_grpc_query.go b/weed/server/volume_grpc_query.go
index 767e28e7b..2f4fab96a 100644
--- a/weed/server/volume_grpc_query.go
+++ b/weed/server/volume_grpc_query.go
@@ -24,7 +24,7 @@ func (vs *VolumeServer) Query(req *volume_server_pb.QueryRequest, stream volume_
n.ParsePath(id_cookie)
cookie := n.Cookie
- if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil {
+ if _, err := vs.store.ReadVolumeNeedle(volumeId, n, nil); err != nil {
glog.V(0).Infof("volume query failed to read fid %s: %v", fid, err)
return err
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 62fbc19a7..83df32fdd 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -28,14 +28,16 @@ type VolumeServer struct {
FixJpgOrientation bool
ReadRedirect bool
compactionBytePerSecond int64
- MetricsAddress string
- MetricsIntervalSec int
+ metricsAddress string
+ metricsIntervalSec int
fileSizeLimitBytes int64
+ isHeartbeating bool
+ stopChan chan bool
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
port int, publicUrl string,
- folders []string, maxCounts []int, minFreeSpacePercent []float32,
+ folders []string, maxCounts []int, minFreeSpacePercents []float32,
needleMapKind storage.NeedleMapType,
masterNodes []string, pulseSeconds int,
dataCenter string, rack string,
@@ -66,16 +68,21 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.volume"),
compactionBytePerSecond: int64(compactionMBPerSecond) * 1024 * 1024,
fileSizeLimitBytes: int64(fileSizeLimitMB) * 1024 * 1024,
+ isHeartbeating: true,
+ stopChan: make(chan bool),
}
vs.SeedMasterNodes = masterNodes
- vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercent, vs.needleMapKind)
+
+ vs.checkWithMaster()
+
+ vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpacePercents, vs.needleMapKind)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
+ adminMux.HandleFunc("/status", vs.statusHandler)
if signingKey == "" || enableUiAccess {
// only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
- adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler))
/*
adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler))
adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler))
@@ -90,11 +97,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
go vs.heartbeat()
- hostAddress := fmt.Sprintf("%s:%d", ip, port)
- go stats.LoopPushingMetric("volumeServer", hostAddress, stats.VolumeServerGather,
- func() (addr string, intervalSeconds int) {
- return vs.MetricsAddress, vs.MetricsIntervalSec
- })
+ go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec)
return vs
}
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index 14ad27d42..ad13cdf3b 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
@@ -25,6 +26,7 @@ security settings:
*/
func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
switch r.Method {
case "GET", "HEAD":
stats.ReadRequest()
@@ -39,6 +41,7 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
}
func (vs *VolumeServer) publicReadOnlyHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
switch r.Method {
case "GET":
stats.ReadRequest()
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 34655d833..4d84c9c4d 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -10,6 +10,7 @@ import (
)
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{})
m["Version"] = util.Version()
var ds []*volume_server_pb.DiskStatus
@@ -24,6 +25,7 @@ func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
}
func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{})
m["Version"] = util.Version()
var ds []*volume_server_pb.DiskStatus
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 19b459136..15fd446e7 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -18,6 +18,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -26,6 +27,8 @@ var fileNameEscaper = strings.NewReplacer("\\", "\\\\", "\"", "\\\"")
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
+ // println(r.Method + " " + r.URL.Path)
+
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
@@ -79,15 +82,24 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
cookie := n.Cookie
+
+ readOption := &storage.ReadOption{
+ ReadDeleted: r.FormValue("readDeleted") == "true",
+ }
+
var count int
if hasVolume {
- count, err = vs.store.ReadVolumeNeedle(volumeId, n)
+ count, err = vs.store.ReadVolumeNeedle(volumeId, n, readOption)
} else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(volumeId, n)
}
+ if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume {
+ glog.V(4).Infof("read needle: %v", err)
+ // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request
+ }
// glog.V(4).Infoln("read bytes", count, "error", err)
if err != nil || count < 0 {
- glog.V(0).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
+ glog.V(3).Infof("read %s isNormalVolume %v error: %v", r.URL.Path, hasVolume, err)
w.WriteHeader(http.StatusNotFound)
return
}
@@ -142,20 +154,18 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- if ext != ".gz" {
- if n.IsGzipped() {
- if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") {
- if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize {
- if n.Data, err = util.UnGzipData(n.Data); err != nil {
- glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
- }
- } else {
- w.Header().Set("Content-Encoding", "gzip")
- }
- } else {
- if n.Data, err = util.UnGzipData(n.Data); err != nil {
- glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
- }
+ if n.IsCompressed() {
+ if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize {
+ if n.Data, err = util.DecompressData(n.Data); err != nil {
+ glog.V(0).Infoln("ungzip error:", err, r.URL.Path)
+ }
+ } else if strings.Contains(r.Header.Get("Accept-Encoding"), "zstd") && util.IsZstdContent(n.Data) {
+ w.Header().Set("Content-Encoding", "zstd")
+ } else if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") && util.IsGzippedContent(n.Data) {
+ w.Header().Set("Content-Encoding", "gzip")
+ } else {
+ if n.Data, err = util.DecompressData(n.Data); err != nil {
+ glog.V(0).Infoln("uncompress error:", err, r.URL.Path)
}
}
}
@@ -172,7 +182,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string,
return false
}
- chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
+ chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsCompressed())
if e != nil {
glog.V(0).Infof("load chunked manifest (%s) error: %v", r.URL.Path, e)
return false
@@ -208,7 +218,9 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string,
func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext string, r *http.Request) io.ReadSeeker {
rs := originalDataReaderSeeker
-
+ if len(ext) > 0 {
+ ext = strings.ToLower(ext)
+ }
width, height, mode, shouldResize := shouldResizeImages(ext, r)
if shouldResize {
rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, mode)
@@ -217,9 +229,6 @@ func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext strin
}
func shouldResizeImages(ext string, r *http.Request) (width, height int, mode string, shouldResize bool) {
- if len(ext) > 0 {
- ext = strings.ToLower(ext)
- }
if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" {
if r.FormValue("width") != "" {
width, _ = strconv.Atoi(r.FormValue("width"))
@@ -245,13 +254,13 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
}
w.Header().Set("Accept-Ranges", "bytes")
+ adjustHeaderContentDisposition(w, r, filename)
+
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
return nil
}
- adjustHeadersAfterHEAD(w, r, filename)
-
processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
if _, e = rs.Seek(offset, 0); e != nil {
return e
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index 8b2027e7b..e535327e2 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -13,6 +13,7 @@ import (
)
func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
var ds []*volume_server_pb.DiskStatus
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 9a00dcc29..01a77b901 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -13,6 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/topology"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
@@ -42,7 +43,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
return
}
- reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
+ reqNeedle, originalSize, contentMd5, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes)
if ne != nil {
writeJsonError(w, r, http.StatusBadRequest, ne)
return
@@ -67,9 +68,10 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
ret.Name = string(reqNeedle.Name)
}
ret.Size = uint32(originalSize)
- ret.ETag = reqNeedle.Etag()
+ ret.ETag = fmt.Sprintf("%x", util.Base64Md5ToBytes(contentMd5))
ret.Mime = string(reqNeedle.Mime)
setEtag(w, ret.ETag)
+ w.Header().Set("Content-MD5", contentMd5)
writeJsonQuiet(w, r, httpStatus, ret)
}
@@ -103,7 +105,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
return
}
- _, ok := vs.store.ReadVolumeNeedle(volumeId, n)
+ _, ok := vs.store.ReadVolumeNeedle(volumeId, n, nil)
if ok != nil {
m := make(map[string]uint32)
m["size"] = 0
@@ -120,7 +122,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
count := int64(n.Size)
if n.IsChunkedManifest() {
- chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsGzipped())
+ chunkManifest, e := operation.LoadChunkManifest(n.Data, n.IsCompressed())
if e != nil {
writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Load chunks manifest error: %v", e))
return
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index d86664542..3e9f882e3 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -10,7 +10,6 @@ import (
"strings"
"time"
- "github.com/chrislusf/seaweedfs/weed/util/grace"
"golang.org/x/net/webdav"
"google.golang.org/grpc"
@@ -20,7 +19,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/util/chunk_cache"
- "github.com/chrislusf/seaweedfs/weed/filer2"
+ "github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/security"
)
@@ -42,7 +41,7 @@ type WebDavOption struct {
type WebDavServer struct {
option *WebDavOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
Handler *webdav.Handler
}
@@ -68,9 +67,10 @@ func NewWebDavServer(option *WebDavOption) (ws *WebDavServer, err error) {
type WebDavFileSystem struct {
option *WebDavOption
secret security.SigningKey
- filer *filer2.Filer
+ filer *filer.Filer
grpcDialOption grpc.DialOption
- chunkCache *chunk_cache.ChunkCache
+ chunkCache *chunk_cache.TieredChunkCache
+ signature int32
}
type FileInfo struct {
@@ -94,19 +94,17 @@ type WebDavFile struct {
isDirectory bool
off int64
entry *filer_pb.Entry
- entryViewCache []filer2.VisibleInterval
+ entryViewCache []filer.VisibleInterval
reader io.ReaderAt
}
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
- chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB)
- grace.OnInterrupt(func() {
- chunkCache.Shutdown()
- })
+ chunkCache := chunk_cache.NewTieredChunkCache(256, option.CacheDir, option.CacheSizeMB, 1024*1024)
return &WebDavFileSystem{
option: option,
chunkCache: chunkCache,
+ signature: util.RandomInt32(),
}, nil
}
@@ -120,8 +118,8 @@ func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient)
}, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
}
-func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string {
- return hostAndPort
+func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
+ return location.Url
}
func clearName(name string) (string, error) {
@@ -169,6 +167,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
Gid: fs.option.Gid,
},
},
+ Signatures: []int32{fs.signature},
}
glog.V(1).Infof("mkdir: %v", request)
@@ -220,6 +219,7 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
TtlSec: 0,
},
},
+ Signatures: []int32{fs.signature},
}); err != nil {
return fmt.Errorf("create %s: %v", fullFilePath, err)
}
@@ -259,7 +259,7 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string)
dir, name := util.FullPath(fullFilePath).DirAndName()
- return filer_pb.Remove(fs, dir, name, true, false, false)
+ return filer_pb.Remove(fs, dir, name, true, false, false, false, []int32{fs.signature})
}
@@ -338,7 +338,7 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F
if err != nil {
return nil, err
}
- fi.size = int64(filer2.TotalSize(entry.GetChunks()))
+ fi.size = int64(filer.FileSize(entry))
fi.name = string(fullpath)
fi.mode = os.FileMode(entry.Attributes.FileMode)
fi.modifiledTime = time.Unix(entry.Attributes.Mtime, 0)
@@ -387,7 +387,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
Count: 1,
Replication: "",
Collection: f.fs.option.Collection,
- ParentPath: dir,
+ Path: f.name,
}
resp, err := client.AssignVolume(ctx, request)
@@ -426,8 +426,9 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
f.entry.Attributes.Replication = replication
request := &filer_pb.UpdateEntryRequest{
- Directory: dir,
- Entry: f.entry,
+ Directory: dir,
+ Entry: f.entry,
+ Signatures: []int32{f.fs.signature},
}
if _, err := client.UpdateEntry(ctx, request); err != nil {
@@ -470,16 +471,17 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
if err != nil {
return 0, err
}
- if len(f.entry.Chunks) == 0 {
+ fileSize := int64(filer.FileSize(f.entry))
+ if fileSize == 0 {
return 0, io.EOF
}
if f.entryViewCache == nil {
- f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks)
+ f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks)
f.reader = nil
}
if f.reader == nil {
- chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32)
- f.reader = filer2.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache)
+ chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
+ f.reader = filer.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache, fileSize)
}
readSize, err = f.reader.ReadAt(p, f.off)
@@ -487,11 +489,7 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize))
f.off += int64(readSize)
- if err == io.EOF {
- err = nil
- }
-
- if err != nil {
+ if err != nil && err != io.EOF {
glog.Errorf("file read %s: %v", f.name, err)
}
@@ -507,7 +505,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
fi := FileInfo{
- size: int64(filer2.TotalSize(entry.GetChunks())),
+ size: int64(filer.FileSize(entry)),
name: entry.Name,
mode: os.FileMode(entry.Attributes.FileMode),
modifiledTime: time.Unix(entry.Attributes.Mtime, 0),
@@ -550,9 +548,9 @@ func (f *WebDavFile) Seek(offset int64, whence int) (int64, error) {
var err error
switch whence {
- case 0:
+ case io.SeekStart:
f.off = 0
- case 2:
+ case io.SeekEnd:
if fi, err := f.fs.stat(ctx, f.name); err != nil {
return 0, err
} else {