diff options
| author | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
|---|---|---|
| committer | shibinbin <shibinbin@megvii.com> | 2020-06-04 17:24:18 +0800 |
| commit | 40334bc28d3fa694ce59b4e65077efb845264d20 (patch) | |
| tree | a085e2e33851c4d916bef2952abc7cfbfe95ee88 /weed/server | |
| parent | d892cad15d748327c2b7c649f6398ff35d8dce0b (diff) | |
| parent | fbed2e9026b71c810dd86bd826c9e068e93d3c48 (diff) | |
| download | seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.tar.xz seaweedfs-40334bc28d3fa694ce59b4e65077efb845264d20.zip | |
Merge remote-tracking branch 'upstream/master'
Diffstat (limited to 'weed/server')
36 files changed, 1437 insertions, 665 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 31a9a73b8..bc6008864 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -1,10 +1,11 @@ package weed_server import ( - "bytes" "encoding/json" "errors" "fmt" + "io" + "mime/multipart" "net/http" "path/filepath" "strconv" @@ -45,6 +46,12 @@ func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj inter if err != nil { return } + + if httpStatus >= 400 { + glog.V(0).Infof("response method:%s URL:%s with httpStatus:%d and JSON:%s", + r.Method, r.URL.String(), httpStatus, string(bytes)) + } + callback := r.FormValue("callback") if callback == "" { w.Header().Set("Content-Type", "application/json") @@ -99,13 +106,13 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } debug("parsing upload file...") - fname, data, mimeType, pairMap, isGzipped, originalDataSize, lastModified, _, _, pe := needle.ParseUpload(r, 256*1024*1024) + pu, pe := needle.ParseUpload(r, 256*1024*1024) if pe != nil { writeJsonError(w, r, http.StatusBadRequest, pe) return } - debug("assigning file id for", fname) + debug("assigning file id for", pu.FileName) r.ParseForm() count := uint64(1) if r.FormValue("count") != "" { @@ -129,21 +136,21 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterUrl st } url := "http://" + assignResult.Url + "/" + assignResult.Fid - if lastModified != 0 { - url = url + "?ts=" + strconv.FormatUint(lastModified, 10) + if pu.ModifiedTime != 0 { + url = url + "?ts=" + strconv.FormatUint(pu.ModifiedTime, 10) } debug("upload file to store", url) - uploadResult, err := operation.Upload(url, fname, bytes.NewReader(data), isGzipped, mimeType, pairMap, assignResult.Auth) + uploadResult, err := operation.UploadData(url, pu.FileName, false, pu.Data, pu.IsGzipped, pu.MimeType, pu.PairMap, assignResult.Auth) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return } - m["fileName"] = fname + m["fileName"] = pu.FileName m["fid"] = assignResult.Fid m["fileUrl"] = assignResult.PublicUrl + "/" + assignResult.Fid - m["size"] = originalDataSize + m["size"] = pu.OriginalDataSize m["eTag"] = uploadResult.ETag writeJsonQuiet(w, r, http.StatusCreated, m) return @@ -184,19 +191,19 @@ func parseURLPath(path string) (vid, fid, filename, ext string, isVolumeIdOnly b func statsHealthHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) - m["Version"] = util.VERSION + m["Version"] = util.Version() writeJsonQuiet(w, r, http.StatusOK, m) } func statsCounterHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) - m["Version"] = util.VERSION + m["Version"] = util.Version() m["Counters"] = serverStats writeJsonQuiet(w, r, http.StatusOK, m) } func statsMemoryHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) - m["Version"] = util.VERSION + m["Version"] = util.Version() m["Memory"] = stats.MemStat() writeJsonQuiet(w, r, http.StatusOK, m) } @@ -210,3 +217,107 @@ func handleStaticResources2(r *mux.Router) { r.Handle("/favicon.ico", http.FileServer(statikFS)) r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(statikFS))) } + +func adjustHeadersAfterHEAD(w http.ResponseWriter, r *http.Request, filename string) { + if filename != "" { + contentDisposition := "inline" + if r.FormValue("dl") != "" { + if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl { + contentDisposition = "attachment" + } + } + w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`) + } +} + +func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) { + rangeReq := r.Header.Get("Range") + + if rangeReq == "" { + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + if err := writeFn(w, 0, totalSize); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + return + } + + //the rest is dealing with partial content request + //mostly copy from src/pkg/net/http/fs.go + ranges, err := parseRange(rangeReq, totalSize) + if err != nil { + http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) + return + } + if sumRangesSize(ranges) > totalSize { + // The total number of bytes in all the ranges + // is larger than the size of the file by + // itself, so this is probably an attack, or a + // dumb client. Ignore the range request. + return + } + if len(ranges) == 0 { + return + } + if len(ranges) == 1 { + // RFC 2616, Section 14.16: + // "When an HTTP message includes the content of a single + // range (for example, a response to a request for a + // single range, or to a request for a set of ranges + // that overlap without any holes), this content is + // transmitted with a Content-Range header, and a + // Content-Length header showing the number of bytes + // actually transferred. + // ... + // A response to a request for a single range MUST NOT + // be sent using the multipart/byteranges media type." + ra := ranges[0] + w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) + w.Header().Set("Content-Range", ra.contentRange(totalSize)) + w.WriteHeader(http.StatusPartialContent) + + err = writeFn(w, ra.start, ra.length) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + return + } + return + } + + // process multiple ranges + for _, ra := range ranges { + if ra.start > totalSize { + http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) + return + } + } + sendSize := rangesMIMESize(ranges, mimeType, totalSize) + pr, pw := io.Pipe() + mw := multipart.NewWriter(pw) + w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) + sendContent := pr + defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. + go func() { + for _, ra := range ranges { + part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize)) + if e != nil { + pw.CloseWithError(e) + return + } + if e = writeFn(part, ra.start, ra.length); e != nil { + pw.CloseWithError(e) + return + } + } + mw.Close() + pw.Close() + }() + if w.Header().Get("Content-Encoding") == "" { + w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) + } + w.WriteHeader(http.StatusPartialContent) + if _, err := io.CopyN(w, sendContent, sendSize); err != nil { + http.Error(w, "Internal Error", http.StatusInternalServerError) + return + } +} diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 03954a58c..901f798f0 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -14,13 +14,16 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/util" ) func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.LookupDirectoryEntryRequest) (*filer_pb.LookupDirectoryEntryResponse, error) { - entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name)))) - if err == filer2.ErrNotFound { - return &filer_pb.LookupDirectoryEntryResponse{}, nil + glog.V(4).Infof("LookupDirectoryEntry %s", filepath.Join(req.Directory, req.Name)) + + entry, err := fs.filer.FindEntry(ctx, util.JoinPath(req.Directory, req.Name)) + if err == filer_pb.ErrNotFound { + return &filer_pb.LookupDirectoryEntryResponse{}, err } if err != nil { glog.V(3).Infof("LookupDirectoryEntry %s: %+v, ", filepath.Join(req.Directory, req.Name), err) @@ -40,6 +43,8 @@ func (fs *FilerServer) LookupDirectoryEntry(ctx context.Context, req *filer_pb.L func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream filer_pb.SeaweedFiler_ListEntriesServer) error { + glog.V(4).Infof("ListEntries %v", req) + limit := int(req.Limit) if limit == 0 { limit = fs.option.DirListingLimit @@ -53,7 +58,8 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file lastFileName := req.StartFromFileName includeLastFile := req.InclusiveStartFrom for limit > 0 { - entries, err := fs.filer.ListDirectoryEntries(stream.Context(), filer2.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) + entries, err := fs.filer.ListDirectoryEntries(stream.Context(), util.FullPath(req.Directory), lastFileName, includeLastFile, paginationLimit) + if err != nil { return err } @@ -84,6 +90,7 @@ func (fs *FilerServer) ListEntries(req *filer_pb.ListEntriesRequest, stream file }); err != nil { return err } + limit-- if limit == 0 { return nil @@ -132,9 +139,10 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntryRequest) (resp *filer_pb.CreateEntryResponse, err error) { + glog.V(4).Infof("CreateEntry %v", req) + resp = &filer_pb.CreateEntryResponse{} - fullpath := filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))) chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) if req.Entry.Attributes == nil { @@ -144,7 +152,7 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr } createErr := fs.filer.CreateEntry(ctx, &filer2.Entry{ - FullPath: fullpath, + FullPath: util.JoinPath(req.Directory, req.Entry.Name), Attr: filer2.PbToEntryAttribute(req.Entry.Attributes), Chunks: chunks, }, req.OExcl) @@ -161,8 +169,10 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntryRequest) (*filer_pb.UpdateEntryResponse, error) { - fullpath := filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name)) - entry, err := fs.filer.FindEntry(ctx, filer2.FullPath(fullpath)) + glog.V(4).Infof("UpdateEntry %v", req) + + fullpath := util.Join(req.Directory, req.Entry.Name) + entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath)) if err != nil { return &filer_pb.UpdateEntryResponse{}, fmt.Errorf("not found %s: %v", fullpath, err) } @@ -173,7 +183,7 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr chunks, garbages := filer2.CompactFileChunks(req.Entry.Chunks) newEntry := &filer2.Entry{ - FullPath: filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Entry.Name))), + FullPath: util.JoinPath(req.Directory, req.Entry.Name), Attr: entry.Attr, Extended: req.Entry.Extended, Chunks: chunks, @@ -215,9 +225,50 @@ func (fs *FilerServer) UpdateEntry(ctx context.Context, req *filer_pb.UpdateEntr return &filer_pb.UpdateEntryResponse{}, err } +func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendToEntryRequest) (*filer_pb.AppendToEntryResponse, error) { + + glog.V(4).Infof("AppendToEntry %v", req) + + fullpath := util.NewFullPath(req.Directory, req.EntryName) + var offset int64 = 0 + entry, err := fs.filer.FindEntry(ctx, util.FullPath(fullpath)) + if err == filer_pb.ErrNotFound { + entry = &filer2.Entry{ + FullPath: fullpath, + Attr: filer2.Attr{ + Crtime: time.Now(), + Mtime: time.Now(), + Mode: os.FileMode(0644), + Uid: OS_UID, + Gid: OS_GID, + }, + } + } else { + offset = int64(filer2.TotalSize(entry.Chunks)) + } + + for _, chunk := range req.Chunks { + chunk.Offset = offset + offset += int64(chunk.Size) + } + + entry.Chunks = append(entry.Chunks, req.Chunks...) + + err = fs.filer.CreateEntry(context.Background(), entry, false) + + return &filer_pb.AppendToEntryResponse{}, err +} + func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntryRequest) (resp *filer_pb.DeleteEntryResponse, err error) { - err = fs.filer.DeleteEntryMetaAndData(ctx, filer2.FullPath(filepath.ToSlash(filepath.Join(req.Directory, req.Name))), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData) - return &filer_pb.DeleteEntryResponse{}, err + + glog.V(4).Infof("DeleteEntry %v", req) + + err = fs.filer.DeleteEntryMetaAndData(ctx, util.JoinPath(req.Directory, req.Name), req.IsRecursive, req.IgnoreRecursiveError, req.IsDeleteData) + resp = &filer_pb.DeleteEntryResponse{} + if err != nil { + resp.Error = err.Error() + } + return resp, nil } func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { @@ -226,6 +277,7 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol if req.TtlSec > 0 { ttlStr = strconv.Itoa(int(req.TtlSec)) } + collection, replication, _ := fs.detectCollection(req.ParentPath, req.Collection, req.Replication) var altRequest *operation.VolumeAssignRequest @@ -236,16 +288,16 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol assignRequest := &operation.VolumeAssignRequest{ Count: uint64(req.Count), - Replication: req.Replication, - Collection: req.Collection, + Replication: replication, + Collection: collection, Ttl: ttlStr, DataCenter: dataCenter, } if dataCenter != "" { altRequest = &operation.VolumeAssignRequest{ Count: uint64(req.Count), - Replication: req.Replication, - Collection: req.Collection, + Replication: replication, + Collection: collection, Ttl: ttlStr, DataCenter: "", } @@ -253,26 +305,30 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol assignResult, err := operation.Assign(fs.filer.GetMaster(), fs.grpcDialOption, assignRequest, altRequest) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) - return nil, fmt.Errorf("assign volume: %v", err) + return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil } if assignResult.Error != "" { glog.V(3).Infof("AssignVolume error: %v", assignResult.Error) - return nil, fmt.Errorf("assign volume result: %v", assignResult.Error) + return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume result: %v", assignResult.Error)}, nil } return &filer_pb.AssignVolumeResponse{ - FileId: assignResult.Fid, - Count: int32(assignResult.Count), - Url: assignResult.Url, - PublicUrl: assignResult.PublicUrl, - Auth: string(assignResult.Auth), - }, err + FileId: assignResult.Fid, + Count: int32(assignResult.Count), + Url: assignResult.Url, + PublicUrl: assignResult.PublicUrl, + Auth: string(assignResult.Auth), + Collection: collection, + Replication: replication, + }, nil } func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.DeleteCollectionRequest) (resp *filer_pb.DeleteCollectionResponse, err error) { - err = fs.filer.MasterClient.WithClient(ctx, func(client master_pb.SeaweedClient) error { - _, err := client.CollectionDelete(ctx, &master_pb.CollectionDeleteRequest{ + glog.V(4).Infof("DeleteCollection %v", req) + + err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: req.GetCollection(), }) return err @@ -283,13 +339,22 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { - input := &master_pb.StatisticsRequest{ - Replication: req.Replication, - Collection: req.Collection, - Ttl: req.Ttl, - } + var output *master_pb.StatisticsResponse + + err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error { + grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{ + Replication: req.Replication, + Collection: req.Collection, + Ttl: req.Ttl, + }) + if grpcErr != nil { + return grpcErr + } + + output = grpcResponse + return nil + }) - output, err := operation.Statistics(fs.filer.GetMaster(), fs.grpcDialOption, input) if err != nil { return nil, err } @@ -303,10 +368,88 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { - return &filer_pb.GetFilerConfigurationResponse{ + t := &filer_pb.GetFilerConfigurationResponse{ Masters: fs.option.Masters, Collection: fs.option.Collection, Replication: fs.option.DefaultReplication, MaxMb: uint32(fs.option.MaxMB), - }, nil + DirBuckets: fs.filer.DirBucketsPath, + Cipher: fs.filer.Cipher, + } + + glog.V(4).Infof("GetFilerConfiguration: %v", t) + + return t, nil +} + +func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error { + + req, err := stream.Recv() + if err != nil { + return err + } + + clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort) + m := make(map[string]bool) + for _, tp := range req.Resources { + m[tp] = true + } + fs.brokersLock.Lock() + fs.brokers[clientName] = m + glog.V(0).Infof("+ broker %v", clientName) + fs.brokersLock.Unlock() + + defer func() { + fs.brokersLock.Lock() + delete(fs.brokers, clientName) + glog.V(0).Infof("- broker %v: %v", clientName, err) + fs.brokersLock.Unlock() + }() + + for { + if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil { + glog.V(0).Infof("send broker %v: %+v", clientName, err) + return err + } + // println("replied") + + if _, err := stream.Recv(); err != nil { + glog.V(0).Infof("recv broker %v: %v", clientName, err) + return err + } + // println("received") + } + +} + +func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) { + + resp = &filer_pb.LocateBrokerResponse{} + + fs.brokersLock.Lock() + defer fs.brokersLock.Unlock() + + var localBrokers []*filer_pb.LocateBrokerResponse_Resource + + for b, m := range fs.brokers { + if _, found := m[req.Resource]; found { + resp.Found = true + resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{ + { + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }, + } + return + } + localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{ + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }) + } + + resp.Resources = localBrokers + + return resp, nil + } diff --git a/weed/server/filer_grpc_server_listen.go b/weed/server/filer_grpc_server_listen.go new file mode 100644 index 000000000..848a1fc3a --- /dev/null +++ b/weed/server/filer_grpc_server_listen.go @@ -0,0 +1,108 @@ +package weed_server + +import ( + "fmt" + "strings" + "time" + + "github.com/golang/protobuf/proto" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, stream filer_pb.SeaweedFiler_SubscribeMetadataServer) error { + + peerAddress := findClientAddress(stream.Context(), 0) + + clientName := fs.addClient(req.ClientName, peerAddress) + + defer fs.deleteClient(clientName) + + lastReadTime := time.Unix(0, req.SinceNs) + glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) + var processedTsNs int64 + + eachEventNotificationFn := func(dirPath string, eventNotification *filer_pb.EventNotification, tsNs int64) error { + + // get complete path to the file or directory + var entryName string + if eventNotification.OldEntry != nil { + entryName = eventNotification.OldEntry.Name + } else if eventNotification.NewEntry != nil { + entryName = eventNotification.NewEntry.Name + } + + fullpath := util.Join(dirPath, entryName) + + // skip on filer internal meta logs + if strings.HasPrefix(fullpath, filer2.SystemLogDir) { + return nil + } + + if !strings.HasPrefix(fullpath, req.PathPrefix) { + return nil + } + + message := &filer_pb.SubscribeMetadataResponse{ + Directory: dirPath, + EventNotification: eventNotification, + TsNs: tsNs, + } + if err := stream.Send(message); err != nil { + glog.V(0).Infof("=> client %v: %+v", clientName, err) + return err + } + return nil + } + + eachLogEntryFn := func(logEntry *filer_pb.LogEntry) error { + event := &filer_pb.SubscribeMetadataResponse{} + if err := proto.Unmarshal(logEntry.Data, event); err != nil { + glog.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + return fmt.Errorf("unexpected unmarshal filer_pb.SubscribeMetadataResponse: %v", err) + } + + if err := eachEventNotificationFn(event.Directory, event.EventNotification, event.TsNs); err != nil { + return err + } + + processedTsNs = logEntry.TsNs + + return nil + } + + if err := fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn); err != nil { + return fmt.Errorf("reading from persisted logs: %v", err) + } + + if processedTsNs != 0 { + lastReadTime = time.Unix(0, processedTsNs) + } + + err := fs.filer.MetaLogBuffer.LoopProcessLogData(lastReadTime, func() bool { + fs.listenersLock.Lock() + fs.listenersCond.Wait() + fs.listenersLock.Unlock() + return true + }, eachLogEntryFn) + + return err + +} + +func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) { + clientName = clientType + "@" + clientAddress + glog.V(0).Infof("+ listener %v", clientName) + return +} + +func (fs *FilerServer) deleteClient(clientName string) { + glog.V(0).Infof("- listener %v", clientName) +} + +func (fs *FilerServer) notifyMetaListeners() { + fs.listenersCond.Broadcast() +} diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 0669a26f1..71e2c7d17 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -3,10 +3,12 @@ package weed_server import ( "context" "fmt" + "path/filepath" + "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "path/filepath" + "github.com/chrislusf/seaweedfs/weed/util" ) func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.AtomicRenameEntryRequest) (*filer_pb.AtomicRenameEntryResponse, error) { @@ -18,7 +20,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return nil, err } - oldParent := filer2.FullPath(filepath.ToSlash(req.OldDirectory)) + oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory)) oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName)) if err != nil { @@ -27,7 +29,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom } var events MoveEvents - moveErr := fs.moveEntry(ctx, oldParent, oldEntry, filer2.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events) + moveErr := fs.moveEntry(ctx, oldParent, oldEntry, util.FullPath(filepath.ToSlash(req.NewDirectory)), req.NewName, &events) if moveErr != nil { fs.filer.RollbackTransaction(ctx) return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, err) @@ -38,26 +40,26 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom } } - for _, entry := range events.newEntries { - fs.filer.NotifyUpdateEvent(nil, entry, false) - } - for _, entry := range events.oldEntries { - fs.filer.NotifyUpdateEvent(entry, nil, false) - } - return &filer_pb.AtomicRenameEntryResponse{}, nil } -func (fs *FilerServer) moveEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error { - if entry.IsDirectory() { - if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil { - return err +func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { + + if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events, func() error { + if entry.IsDirectory() { + if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, events); err != nil { + return err + } } + return nil + }); err != nil { + return fmt.Errorf("fail to move %s => %s: %v", oldParent.Child(entry.Name()), newParent.Child(newName), err) } - return fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, events) + + return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -90,7 +92,8 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent filer return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullPath, entry *filer2.Entry, newParent filer2.FullPath, newName string, events *MoveEvents) error { +func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer2.Entry, newParent util.FullPath, newName string, events *MoveEvents, + moveFolderSubEntries func() error) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -112,6 +115,14 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullP return createErr } + events.newEntries = append(events.newEntries, newEntry) + + if moveFolderSubEntries != nil { + if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { + return moveChildrenErr + } + } + // delete old entry deleteErr := fs.filer.DeleteEntryMetaAndData(ctx, oldPath, false, false, false) if deleteErr != nil { @@ -119,7 +130,7 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent filer2.FullP } events.oldEntries = append(events.oldEntries, entry) - events.newEntries = append(events.newEntries, newEntry) + return nil } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 72cca1f6f..10b607dfe 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -5,11 +5,15 @@ import ( "fmt" "net/http" "os" + "sync" "time" "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/util/grace" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" @@ -19,10 +23,11 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer2/etcd" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb" _ "github.com/chrislusf/seaweedfs/weed/filer2/leveldb2" + _ "github.com/chrislusf/seaweedfs/weed/filer2/mongodb" _ "github.com/chrislusf/seaweedfs/weed/filer2/mysql" _ "github.com/chrislusf/seaweedfs/weed/filer2/postgres" _ "github.com/chrislusf/seaweedfs/weed/filer2/redis" - _ "github.com/chrislusf/seaweedfs/weed/filer2/tikv" + _ "github.com/chrislusf/seaweedfs/weed/filer2/redis2" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" @@ -37,15 +42,16 @@ type FilerOption struct { Masters []string Collection string DefaultReplication string - RedirectOnRead bool DisableDirListing bool MaxMB int DirListingLimit int DataCenter string DefaultLevelDbDir string DisableHttp bool - Port int + Host string + Port uint32 recursiveDelete bool + Cipher bool } type FilerServer struct { @@ -53,6 +59,13 @@ type FilerServer struct { secret security.SigningKey filer *filer2.Filer grpcDialOption grpc.DialOption + + // notifying clients + listenersLock sync.Mutex + listenersCond *sync.Cond + + brokers map[string]map[string]bool + brokersLock sync.Mutex } func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { @@ -60,13 +73,18 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) fs = &FilerServer{ option: option, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), + brokers: make(map[string]map[string]bool), } + fs.listenersCond = sync.NewCond(&fs.listenersLock) if len(option.Masters) == 0 { glog.Fatal("master list is required!") } - fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption) + fs.filer = filer2.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, fs.notifyMetaListeners) + fs.filer.Cipher = option.Cipher + + maybeStartMetrics(fs, option) go fs.filer.KeepConnectedToMaster() @@ -82,6 +100,9 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) util.LoadConfiguration("notification", false) fs.option.recursiveDelete = v.GetBool("filer.options.recursive_delete") + v.SetDefault("filer.options.buckets_folder", "/buckets") + fs.filer.DirBucketsPath = v.GetString("filer.options.buckets_folder") + fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") fs.filer.LoadConfiguration(v) notification.LoadConfiguration(v, "notification.") @@ -94,22 +115,36 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - maybeStartMetrics(fs, option) + fs.filer.LoadBuckets() + + grace.OnInterrupt(func() { + fs.filer.Shutdown() + }) return fs, nil } func maybeStartMetrics(fs *FilerServer, option *FilerOption) { + + for _, master := range option.Masters { + _, err := pb.ParseFilerGrpcAddress(master) + if err != nil { + glog.Fatalf("invalid master address %s: %v", master, err) + } + } + isConnected := false var metricsAddress string var metricsIntervalSec int var readErr error for !isConnected { - metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, option.Masters[0]) - if readErr == nil { - isConnected = true - } else { - time.Sleep(7 * time.Second) + for _, master := range option.Masters { + metricsAddress, metricsIntervalSec, readErr = readFilerConfiguration(fs.grpcDialOption, master) + if readErr == nil { + isConnected = true + } else { + time.Sleep(7 * time.Second) + } } } if metricsAddress == "" && metricsIntervalSec <= 0 { @@ -121,11 +156,11 @@ func maybeStartMetrics(fs *FilerServer, option *FilerOption) { }) } -func readFilerConfiguration(grpcDialOption grpc.DialOption, masterGrpcAddress string) (metricsAddress string, metricsIntervalSec int, err error) { - err = operation.WithMasterServerClient(masterGrpcAddress, grpcDialOption, func(ctx context.Context, masterClient master_pb.SeaweedClient) error { - resp, err := masterClient.GetMasterConfiguration(ctx, &master_pb.GetMasterConfigurationRequest{}) +func readFilerConfiguration(grpcDialOption grpc.DialOption, masterAddress string) (metricsAddress string, metricsIntervalSec int, err error) { + err = operation.WithMasterServerClient(masterAddress, grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { - return fmt.Errorf("get master %s configuration: %v", masterGrpcAddress, err) + return fmt.Errorf("get master %s configuration: %v", masterAddress, err) } metricsAddress, metricsIntervalSec = resp.MetricsAddress, int(resp.MetricsIntervalSeconds) return nil diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index ba21298ba..76c924df1 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -1,19 +1,20 @@ package weed_server import ( + "bytes" "context" "io" - "io/ioutil" "mime" - "mime/multipart" "net/http" - "net/url" - "path" + "path/filepath" "strconv" "strings" + "time" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/images" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -26,13 +27,13 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, path = path[:len(path)-1] } - entry, err := fs.filer.FindEntry(context.Background(), filer2.FullPath(path)) + entry, err := fs.filer.FindEntry(context.Background(), util.FullPath(path)) if err != nil { if path == "/" { fs.listDirectoryHandler(w, r) return } - if err == filer2.ErrNotFound { + if err == filer_pb.ErrNotFound { glog.V(1).Infof("Not found %s: %v", path, err) stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc() w.WriteHeader(http.StatusNotFound) @@ -66,188 +67,68 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request, } w.Header().Set("Accept-Ranges", "bytes") - if r.Method == "HEAD" { - w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10)) - w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat)) - setEtag(w, filer2.ETag(entry.Chunks)) - return - } - - if len(entry.Chunks) == 1 { - fs.handleSingleChunk(w, r, entry) - return - } - - fs.handleMultipleChunks(w, r, entry) - -} - -func (fs *FilerServer) handleSingleChunk(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) { - - fileId := entry.Chunks[0].GetFileIdString() - - urlString, err := fs.filer.MasterClient.LookupFileId(fileId) - if err != nil { - glog.V(1).Infof("operation LookupFileId %s failed, err: %v", fileId, err) - w.WriteHeader(http.StatusNotFound) - return - } - - if fs.option.RedirectOnRead { - stats.FilerRequestCounter.WithLabelValues("redirect").Inc() - http.Redirect(w, r, urlString, http.StatusFound) - return - } - - u, _ := url.Parse(urlString) - q := u.Query() - for key, values := range r.URL.Query() { - for _, value := range values { - q.Add(key, value) - } - } - u.RawQuery = q.Encode() - request := &http.Request{ - Method: r.Method, - URL: u, - Proto: r.Proto, - ProtoMajor: r.ProtoMajor, - ProtoMinor: r.ProtoMinor, - Header: r.Header, - Body: r.Body, - Host: r.Host, - ContentLength: r.ContentLength, - } - glog.V(3).Infoln("retrieving from", u) - resp, do_err := util.Do(request) - if do_err != nil { - glog.V(0).Infoln("failing to connect to volume server", do_err.Error()) - writeJsonError(w, r, http.StatusInternalServerError, do_err) - return - } - defer func() { - io.Copy(ioutil.Discard, resp.Body) - resp.Body.Close() - }() - for k, v := range resp.Header { - w.Header()[k] = v - } - if entry.Attr.Mime != "" { - w.Header().Set("Content-Type", entry.Attr.Mime) - } - w.WriteHeader(resp.StatusCode) - io.Copy(w, resp.Body) -} - -func (fs *FilerServer) handleMultipleChunks(w http.ResponseWriter, r *http.Request, entry *filer2.Entry) { + w.Header().Set("Last-Modified", entry.Attr.Mtime.Format(http.TimeFormat)) + // mime type mimeType := entry.Attr.Mime if mimeType == "" { - if ext := path.Ext(entry.Name()); ext != "" { + if ext := filepath.Ext(entry.Name()); ext != "" { mimeType = mime.TypeByExtension(ext) } } if mimeType != "" { w.Header().Set("Content-Type", mimeType) } - setEtag(w, filer2.ETag(entry.Chunks)) - totalSize := int64(filer2.TotalSize(entry.Chunks)) - - rangeReq := r.Header.Get("Range") - - if rangeReq == "" { - w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if err := fs.writeContent(w, entry, 0, int(totalSize)); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return + // if modified since + if !entry.Attr.Mtime.IsZero() { + w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat)) + if r.Header.Get("If-Modified-Since") != "" { + if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil { + if t.After(entry.Attr.Mtime) { + w.WriteHeader(http.StatusNotModified) + return + } + } } - return } - //the rest is dealing with partial content request - //mostly copy from src/pkg/net/http/fs.go - ranges, err := parseRange(rangeReq, totalSize) - if err != nil { - http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) + // set etag + etag := filer2.ETagEntry(entry) + if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" { + w.WriteHeader(http.StatusNotModified) return } - if sumRangesSize(ranges) > totalSize { - // The total number of bytes in all the ranges - // is larger than the size of the file by - // itself, so this is probably an attack, or a - // dumb client. Ignore the range request. - return - } - if len(ranges) == 0 { - return - } - if len(ranges) == 1 { - // RFC 2616, Section 14.16: - // "When an HTTP message includes the content of a single - // range (for example, a response to a request for a - // single range, or to a request for a set of ranges - // that overlap without any holes), this content is - // transmitted with a Content-Range header, and a - // Content-Length header showing the number of bytes - // actually transferred. - // ... - // A response to a request for a single range MUST NOT - // be sent using the multipart/byteranges media type." - ra := ranges[0] - w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) - w.Header().Set("Content-Range", ra.contentRange(totalSize)) - w.WriteHeader(http.StatusPartialContent) - - err = fs.writeContent(w, entry, ra.start, int(ra.length)) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - return - } + setEtag(w, etag) + + if r.Method == "HEAD" { + w.Header().Set("Content-Length", strconv.FormatInt(int64(filer2.TotalSize(entry.Chunks)), 10)) return } - // process multiple ranges - for _, ra := range ranges { - if ra.start > totalSize { - http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) - return - } - } - sendSize := rangesMIMESize(ranges, mimeType, totalSize) - pr, pw := io.Pipe() - mw := multipart.NewWriter(pw) - w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) - sendContent := pr - defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. - go func() { - for _, ra := range ranges { - part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize)) - if e != nil { - pw.CloseWithError(e) - return - } - if e = fs.writeContent(part, entry, ra.start, int(ra.length)); e != nil { - pw.CloseWithError(e) + filename := entry.Name() + adjustHeadersAfterHEAD(w, r, filename) + + totalSize := int64(filer2.TotalSize(entry.Chunks)) + + if rangeReq := r.Header.Get("Range"); rangeReq == "" { + ext := filepath.Ext(filename) + width, height, mode, shouldResize := shouldResizeImages(ext, r) + if shouldResize { + data, err := filer2.ReadAll(fs.filer.MasterClient, entry.Chunks) + if err != nil { + glog.Errorf("failed to read %s: %v", path, err) + w.WriteHeader(http.StatusNotModified) return } + rs, _, _ := images.Resized(ext, bytes.NewReader(data), width, height, mode) + io.Copy(w, rs) + return } - mw.Close() - pw.Close() - }() - if w.Header().Get("Content-Encoding") == "" { - w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) - } - w.WriteHeader(http.StatusPartialContent) - if _, err := io.CopyN(w, sendContent, sendSize); err != nil { - http.Error(w, "Internal Error", http.StatusInternalServerError) - return } -} - -func (fs *FilerServer) writeContent(w io.Writer, entry *filer2.Entry, offset int64, size int) error { - - return filer2.StreamContent(fs.filer.MasterClient, w, entry.Chunks, offset, size) + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + return filer2.StreamContent(fs.filer.MasterClient, writer, entry.Chunks, offset, size) + }) } diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 87e864559..ae28fc1db 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -6,10 +6,10 @@ import ( "strconv" "strings" - "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" ui "github.com/chrislusf/seaweedfs/weed/server/filer_ui" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/util" ) // listDirectoryHandler lists directories and folers under a directory @@ -32,7 +32,7 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque lastFileName := r.FormValue("lastFileName") - entries, err := fs.filer.ListDirectoryEntries(context.Background(), filer2.FullPath(path), lastFileName, false, limit) + entries, err := fs.filer.ListDirectoryEntries(context.Background(), util.FullPath(path), lastFileName, false, limit) if err != nil { glog.V(0).Infof("listDirectory %s %s %d: %s", path, lastFileName, limit, err) diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 4707f1011..74a558e22 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -2,6 +2,7 @@ package weed_server import ( "context" + "crypto/md5" "encoding/json" "errors" "fmt" @@ -22,6 +23,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -38,7 +40,7 @@ type FilerPostResult struct { Url string `json:"url,omitempty"` } -func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection string, dataCenter string) (fileId, urlLocation string, auth security.EncodedJwt, err error) { +func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, replication, collection, dataCenter, ttlString string, fsync bool) (fileId, urlLocation string, auth security.EncodedJwt, err error) { stats.FilerRequestCounter.WithLabelValues("assign").Inc() start := time.Now() @@ -48,7 +50,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, Count: 1, Replication: replication, Collection: collection, - Ttl: r.URL.Query().Get("ttl"), + Ttl: ttlString, DataCenter: dataCenter, } var altRequest *operation.VolumeAssignRequest @@ -57,7 +59,7 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, Count: 1, Replication: replication, Collection: collection, - Ttl: r.URL.Query().Get("ttl"), + Ttl: ttlString, DataCenter: "", } } @@ -71,6 +73,9 @@ func (fs *FilerServer) assignNewFileInfo(w http.ResponseWriter, r *http.Request, } fileId = assignResult.Fid urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid + if fsync { + urlLocation += "?fsync=true" + } auth = assignResult.Auth return } @@ -80,50 +85,52 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { ctx := context.Background() query := r.URL.Query() - replication := query.Get("replication") - if replication == "" { - replication = fs.option.DefaultReplication - } - collection := query.Get("collection") - if collection == "" { - collection = fs.option.Collection - } + collection, replication, fsync := fs.detectCollection(r.RequestURI, query.Get("collection"), query.Get("replication")) dataCenter := query.Get("dataCenter") if dataCenter == "" { dataCenter = fs.option.DataCenter } + ttlString := r.URL.Query().Get("ttl") + + // read ttl in seconds + ttl, err := needle.ReadTTL(ttlString) + ttlSeconds := int32(0) + if err == nil { + ttlSeconds = int32(ttl.Minutes()) * 60 + } + + if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync); autoChunked { + return + } + + if fs.option.Cipher { + reply, err := fs.encrypt(ctx, w, r, replication, collection, dataCenter, ttlSeconds, ttlString, fsync) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, err) + } else if reply != nil { + writeJsonQuiet(w, r, http.StatusCreated, reply) + } - if autoChunked := fs.autoChunk(ctx, w, r, replication, collection, dataCenter); autoChunked { return } - fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) + fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) if err != nil || fileId == "" || urlLocation == "" { glog.V(0).Infof("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) + writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter)) return } glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) u, _ := url.Parse(urlLocation) - - // This allows a client to generate a chunk manifest and submit it to the filer -- it is a little off - // because they need to provide FIDs instead of file paths... - cm, _ := strconv.ParseBool(query.Get("cm")) - if cm { - q := u.Query() - q.Set("cm", "true") - u.RawQuery = q.Encode() - } - glog.V(4).Infoln("post to", u) - - ret, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) + ret, md5value, err := fs.uploadToVolumeServer(r, u, auth, w, fileId) if err != nil { return } - if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, fileId); err != nil { + if err = fs.updateFilerStore(ctx, r, w, replication, collection, ret, md5value, fileId, ttlSeconds); err != nil { return } @@ -140,8 +147,8 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request) { } // update metadata in filer store -func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, - replication string, collection string, ret operation.UploadResult, fileId string) (err error) { +func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w http.ResponseWriter, replication string, + collection string, ret *operation.UploadResult, md5value []byte, fileId string, ttlSeconds int32) (err error) { stats.FilerRequestCounter.WithLabelValues("postStoreWrite").Inc() start := time.Now() @@ -165,13 +172,13 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w path += ret.Name } } - existingEntry, err := fs.filer.FindEntry(ctx, filer2.FullPath(path)) + existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path)) crTime := time.Now() if err == nil && existingEntry != nil { crTime = existingEntry.Crtime } entry := &filer2.Entry{ - FullPath: filer2.FullPath(path), + FullPath: util.FullPath(path), Attr: filer2.Attr{ Mtime: time.Now(), Crtime: crTime, @@ -180,7 +187,9 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w Gid: OS_GID, Replication: replication, Collection: collection, - TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)), + TtlSec: ttlSeconds, + Mime: ret.Mime, + Md5: md5value, }, Chunks: []*filer_pb.FileChunk{{ FileId: fileId, @@ -189,8 +198,10 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w ETag: ret.ETag, }}, } - if ext := filenamePath.Ext(path); ext != "" { - entry.Attr.Mime = mime.TypeByExtension(ext) + if entry.Attr.Mime == "" { + if ext := filenamePath.Ext(path); ext != "" { + entry.Attr.Mime = mime.TypeByExtension(ext) + } } // glog.V(4).Infof("saving %s => %+v", path, entry) if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil { @@ -205,12 +216,21 @@ func (fs *FilerServer) updateFilerStore(ctx context.Context, r *http.Request, w } // send request to volume server -func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret operation.UploadResult, err error) { +func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth security.EncodedJwt, w http.ResponseWriter, fileId string) (ret *operation.UploadResult, md5value []byte, err error) { stats.FilerRequestCounter.WithLabelValues("postUpload").Inc() start := time.Now() defer func() { stats.FilerRequestHistogram.WithLabelValues("postUpload").Observe(time.Since(start).Seconds()) }() + ret = &operation.UploadResult{} + + md5Hash := md5.New() + body := r.Body + if r.Method == "PUT" { + // only PUT or large chunked files has Md5 in attributes + body = ioutil.NopCloser(io.TeeReader(r.Body, md5Hash)) + } + request := &http.Request{ Method: r.Method, URL: u, @@ -218,10 +238,11 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se ProtoMajor: r.ProtoMajor, ProtoMinor: r.ProtoMinor, Header: r.Header, - Body: r.Body, + Body: body, Host: r.Host, ContentLength: r.ContentLength, } + if auth != "" { request.Header.Set("Authorization", "BEARER "+string(auth)) } @@ -236,7 +257,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se io.Copy(ioutil.Discard, resp.Body) resp.Body.Close() }() - etag := resp.Header.Get("ETag") + respBody, raErr := ioutil.ReadAll(resp.Body) if raErr != nil { glog.V(0).Infoln("failing to upload to volume server", r.RequestURI, raErr.Error()) @@ -244,6 +265,7 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se err = raErr return } + glog.V(4).Infoln("post result", string(respBody)) unmarshalErr := json.Unmarshal(respBody, &ret) if unmarshalErr != nil { @@ -271,9 +293,11 @@ func (fs *FilerServer) uploadToVolumeServer(r *http.Request, u *url.URL, auth se return } } - if etag != "" { - ret.ETag = etag + // use filer calculated md5 ETag, instead of the volume server crc ETag + if r.Method == "PUT" { + md5value = md5Hash.Sum(nil) } + ret.ETag = getEtag(resp) return } @@ -292,11 +316,11 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { ignoreRecursiveError := r.FormValue("ignoreRecursiveError") == "true" skipChunkDeletion := r.FormValue("skipChunkDeletion") == "true" - err := fs.filer.DeleteEntryMetaAndData(context.Background(), filer2.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion) + err := fs.filer.DeleteEntryMetaAndData(context.Background(), util.FullPath(r.URL.Path), isRecursive, ignoreRecursiveError, !skipChunkDeletion) if err != nil { glog.V(1).Infoln("deleting", r.URL.Path, ":", err.Error()) httpStatus := http.StatusInternalServerError - if err == filer2.ErrNotFound { + if err == filer_pb.ErrNotFound { httpStatus = http.StatusNotFound } writeJsonError(w, r, httpStatus, err) @@ -305,3 +329,32 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } + +func (fs *FilerServer) detectCollection(requestURI, qCollection, qReplication string) (collection, replication string, fsync bool) { + // default + collection = fs.option.Collection + replication = fs.option.DefaultReplication + + // get default collection settings + if qCollection != "" { + collection = qCollection + } + if qReplication != "" { + replication = qReplication + } + + // required by buckets folder + if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") { + bucketAndObjectKey := requestURI[len(fs.filer.DirBucketsPath)+1:] + t := strings.Index(bucketAndObjectKey, "/") + if t < 0 { + collection = bucketAndObjectKey + } + if t > 0 { + collection = bucketAndObjectKey[:t] + } + replication, fsync = fs.filer.ReadBucketOption(collection) + } + + return +} diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index 25c0a4b4d..532693742 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -2,7 +2,9 @@ package weed_server import ( "context" + "crypto/md5" "io" + "io/ioutil" "net/http" "path" "strconv" @@ -19,7 +21,7 @@ import ( ) func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - replication string, collection string, dataCenter string) bool { + replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) bool { if r.Method != "POST" { glog.V(4).Infoln("AutoChunking not supported for method", r.Method) return false @@ -55,7 +57,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * return false } - reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter) + reply, err := fs.doAutoChunk(ctx, w, r, contentLength, chunkSize, replication, collection, dataCenter, ttlSec, ttlString, fsync) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else if reply != nil { @@ -65,7 +67,7 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * } func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, - contentLength int64, chunkSize int32, replication string, collection string, dataCenter string) (filerResult *FilerPostResult, replyerr error) { + contentLength int64, chunkSize int32, replication string, collection string, dataCenter string, ttlSec int32, ttlString string, fsync bool) (filerResult *FilerPostResult, replyerr error) { stats.FilerRequestCounter.WithLabelValues("postAutoChunk").Inc() start := time.Now() @@ -87,50 +89,47 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r if fileName != "" { fileName = path.Base(fileName) } + contentType := part1.Header.Get("Content-Type") var fileChunks []*filer_pb.FileChunk + md5Hash := md5.New() + var partReader = ioutil.NopCloser(io.TeeReader(part1, md5Hash)) + chunkOffset := int64(0) for chunkOffset < contentLength { - limitedReader := io.LimitReader(part1, int64(chunkSize)) + limitedReader := io.LimitReader(partReader, int64(chunkSize)) // assign one file id for one chunk - fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter) + fileId, urlLocation, auth, assignErr := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) if assignErr != nil { return nil, assignErr } // upload the chunk to the volume server - chunkName := fileName + "_chunk_" + strconv.FormatInt(int64(len(fileChunks)+1), 10) - uploadedSize, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, chunkName, "", fileId, auth) + uploadResult, uploadErr := fs.doUpload(urlLocation, w, r, limitedReader, fileName, contentType, nil, auth) if uploadErr != nil { return nil, uploadErr } // if last chunk exhausted the reader exactly at the border - if uploadedSize == 0 { + if uploadResult.Size == 0 { break } // Save to chunk manifest structure - fileChunks = append(fileChunks, - &filer_pb.FileChunk{ - FileId: fileId, - Offset: chunkOffset, - Size: uint64(uploadedSize), - Mtime: time.Now().UnixNano(), - }, - ) + fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset)) + + glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size), contentLength) - glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d) of %d", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadedSize), contentLength) + // reset variables for the next chunk + chunkOffset = chunkOffset + int64(uploadResult.Size) // if last chunk was not at full chunk size, but already exhausted the reader - if uploadedSize < int64(chunkSize) { + if int64(uploadResult.Size) < int64(chunkSize) { break } - // reset variables for the next chunk - chunkOffset = chunkOffset + int64(uploadedSize) } path := r.URL.Path @@ -142,7 +141,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r glog.V(4).Infoln("saving", path) entry := &filer2.Entry{ - FullPath: filer2.FullPath(path), + FullPath: util.FullPath(path), Attr: filer2.Attr{ Mtime: time.Now(), Crtime: time.Now(), @@ -151,7 +150,9 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r Gid: OS_GID, Replication: replication, Collection: collection, - TtlSec: int32(util.ParseInt(r.URL.Query().Get("ttl"), 0)), + TtlSec: ttlSec, + Mime: contentType, + Md5: md5Hash.Sum(nil), }, Chunks: fileChunks, } @@ -172,8 +173,7 @@ func (fs *FilerServer) doAutoChunk(ctx context.Context, w http.ResponseWriter, r return } -func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, - limitedReader io.Reader, fileName string, contentType string, fileId string, auth security.EncodedJwt) (size int64, err error) { +func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error) { stats.FilerRequestCounter.WithLabelValues("postAutoChunkUpload").Inc() start := time.Now() @@ -181,9 +181,6 @@ func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *ht stats.FilerRequestHistogram.WithLabelValues("postAutoChunkUpload").Observe(time.Since(start).Seconds()) }() - uploadResult, uploadError := operation.Upload(urlLocation, fileName, limitedReader, false, contentType, nil, auth) - if uploadError != nil { - return 0, uploadError - } - return int64(uploadResult.Size), nil + uploadResult, err, _ := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) + return uploadResult, err } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go new file mode 100644 index 000000000..bea72b2c1 --- /dev/null +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -0,0 +1,89 @@ +package weed_server + +import ( + "context" + "fmt" + "net/http" + "strings" + "time" + + "github.com/chrislusf/seaweedfs/weed/filer2" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" +) + +// handling single chunk POST or PUT upload +func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *http.Request, + replication string, collection string, dataCenter string, ttlSeconds int32, ttlString string, fsync bool) (filerResult *FilerPostResult, err error) { + + fileId, urlLocation, auth, err := fs.assignNewFileInfo(w, r, replication, collection, dataCenter, ttlString, fsync) + + if err != nil || fileId == "" || urlLocation == "" { + return nil, fmt.Errorf("fail to allocate volume for %s, collection:%s, datacenter:%s", r.URL.Path, collection, dataCenter) + } + + glog.V(4).Infof("write %s to %v", r.URL.Path, urlLocation) + + // Note: encrypt(gzip(data)), encrypt data first, then gzip + + sizeLimit := int64(fs.option.MaxMB) * 1024 * 1024 + + pu, err := needle.ParseUpload(r, sizeLimit) + uncompressedData := pu.Data + if pu.IsGzipped { + uncompressedData = pu.UncompressedData + } + if pu.MimeType == "" { + pu.MimeType = http.DetectContentType(uncompressedData) + } + + uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth) + if uploadError != nil { + return nil, fmt.Errorf("upload to volume server: %v", uploadError) + } + + // Save to chunk manifest structure + fileChunks := []*filer_pb.FileChunk{uploadResult.ToPbFileChunk(fileId, 0)} + + // fmt.Printf("uploaded: %+v\n", uploadResult) + + path := r.URL.Path + if strings.HasSuffix(path, "/") { + if pu.FileName != "" { + path += pu.FileName + } + } + + entry := &filer2.Entry{ + FullPath: util.FullPath(path), + Attr: filer2.Attr{ + Mtime: time.Now(), + Crtime: time.Now(), + Mode: 0660, + Uid: OS_UID, + Gid: OS_GID, + Replication: replication, + Collection: collection, + TtlSec: ttlSeconds, + Mime: pu.MimeType, + }, + Chunks: fileChunks, + } + + filerResult = &FilerPostResult{ + Name: pu.FileName, + Size: int64(pu.OriginalDataSize), + } + + if dbErr := fs.filer.CreateEntry(ctx, entry, false); dbErr != nil { + fs.filer.DeleteChunks(entry.Chunks) + err = dbErr + filerResult.Error = dbErr.Error() + return + } + + return +} diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go index 2f0df7f91..f21cce7d1 100644 --- a/weed/server/filer_ui/breadcrumb.go +++ b/weed/server/filer_ui/breadcrumb.go @@ -1,8 +1,9 @@ package master_ui import ( - "path/filepath" "strings" + + "github.com/chrislusf/seaweedfs/weed/util" ) type Breadcrumb struct { @@ -16,7 +17,7 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) { for i := 0; i < len(parts); i++ { crumb := Breadcrumb{ Name: parts[i] + " /", - Link: "/" + filepath.ToSlash(filepath.Join(parts[0:i+1]...)), + Link: "/" + util.Join(parts[0:i+1]...), } if !strings.HasSuffix(crumb.Link, "/") { crumb.Link += "/" diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index fcfd98f7b..1ee214deb 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -1,18 +1,20 @@ package weed_server import ( + "context" "fmt" "net" "strings" "time" "github.com/chrislusf/raft" + "google.golang.org/grpc/peer" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/topology" - "google.golang.org/grpc/peer" ) func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error { @@ -22,8 +24,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ defer func() { if dn != nil { - glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) + // if the volume server disconnects and reconnects quickly + // the unregister and register can race with each other t.UnRegisterDataNode(dn) + glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) message := &master_pb.VolumeLocation{ Url: dn.Url(), @@ -61,14 +65,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ t.Sequence.SetMax(heartbeat.MaxFileKey) if dn == nil { - if heartbeat.Ip == "" { - if pr, ok := peer.FromContext(stream.Context()); ok { - if pr.Addr != net.Addr(nil) { - heartbeat.Ip = pr.Addr.String()[0:strings.LastIndex(pr.Addr.String(), ":")] - glog.V(0).Infof("remote IP address is detected as %v", heartbeat.Ip) - } - } - } dcName, rackName := t.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dc := t.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) @@ -87,6 +83,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } } + if heartbeat.MaxVolumeCount != 0 && dn.GetMaxVolumeCount() != int64(heartbeat.MaxVolumeCount) { + delta := int64(heartbeat.MaxVolumeCount) - dn.GetMaxVolumeCount() + dn.UpAdjustMaxVolumeCountDelta(delta) + } + glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) message := &master_pb.VolumeLocation{ Url: dn.Url(), @@ -189,35 +190,13 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ return ms.informNewLeader(stream) } - // remember client address - ctx := stream.Context() - // fmt.Printf("FromContext %+v\n", ctx) - pr, ok := peer.FromContext(ctx) - if !ok { - glog.Error("failed to get peer from ctx") - return fmt.Errorf("failed to get peer from ctx") - } - if pr.Addr == net.Addr(nil) { - glog.Error("failed to get peer address") - return fmt.Errorf("failed to get peer address") - } - - clientName := req.Name + pr.Addr.String() - glog.V(0).Infof("+ client %v", clientName) + peerAddress := findClientAddress(stream.Context(), req.GrpcPort) - messageChan := make(chan *master_pb.VolumeLocation) stopChan := make(chan bool) - ms.clientChansLock.Lock() - ms.clientChans[clientName] = messageChan - ms.clientChansLock.Unlock() + clientName, messageChan := ms.addClient(req.Name, peerAddress) - defer func() { - glog.V(0).Infof("- client %v", clientName) - ms.clientChansLock.Lock() - delete(ms.clientChans, clientName) - ms.clientChansLock.Unlock() - }() + defer ms.deleteClient(clientName) for _, message := range ms.Topo.ToVolumeLocations() { if err := stream.Send(message); err != nil { @@ -253,7 +232,6 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } } - return nil } func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error { @@ -269,3 +247,57 @@ func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedSe } return nil } + +func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) { + clientName = clientType + "@" + clientAddress + glog.V(0).Infof("+ client %v", clientName) + + messageChan = make(chan *master_pb.VolumeLocation) + + ms.clientChansLock.Lock() + ms.clientChans[clientName] = messageChan + ms.clientChansLock.Unlock() + return +} + +func (ms *MasterServer) deleteClient(clientName string) { + glog.V(0).Infof("- client %v", clientName) + ms.clientChansLock.Lock() + delete(ms.clientChans, clientName) + ms.clientChansLock.Unlock() +} + +func findClientAddress(ctx context.Context, grpcPort uint32) string { + // fmt.Printf("FromContext %+v\n", ctx) + pr, ok := peer.FromContext(ctx) + if !ok { + glog.Error("failed to get peer from ctx") + return "" + } + if pr.Addr == net.Addr(nil) { + glog.Error("failed to get peer address") + return "" + } + if grpcPort == 0 { + return pr.Addr.String() + } + if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok { + externalIP := tcpAddr.IP + return fmt.Sprintf("%s:%d", externalIP, grpcPort) + } + return pr.Addr.String() + +} + +func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) { + resp := &master_pb.ListMasterClientsResponse{} + ms.clientChansLock.RLock() + defer ms.clientChansLock.RUnlock() + + for k := range ms.clientChans { + if strings.HasPrefix(k, req.ClientType+"@") { + resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:]) + } + } + return resp, nil +} diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go new file mode 100644 index 000000000..7e7dcb36b --- /dev/null +++ b/weed/server/master_grpc_server_admin.go @@ -0,0 +1,138 @@ +package weed_server + +import ( + "context" + "fmt" + "math/rand" + "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" +) + +/* +How exclusive lock works? +----------- + +Shell +------ +When shell lock, + * lease an admin token (lockTime, token) + * start a goroutine to renew the admin token periodically + +When shell unlock + * stop the renewal goroutine + * sends a release lock request + +Master +------ +Master maintains: + * randomNumber + * lastLockTime +When master receives the lease/renew request from shell + If lastLockTime still fresh { + if is a renew and token is valid { + // for renew + generate the randomNumber => token + return + } + refuse + return + } else { + // for fresh lease request + generate the randomNumber => token + return + } + +When master receives the release lock request from shell + set the lastLockTime to zero + + +The volume server does not need to verify. +This makes the lock/unlock optional, similar to what golang code usually does. + +*/ + +const ( + LockDuration = 10 * time.Second +) + +type AdminLock struct { + accessSecret int64 + accessLockTime time.Time +} + +type AdminLocks struct { + locks map[string]*AdminLock + sync.RWMutex +} + +func NewAdminLocks() *AdminLocks { + return &AdminLocks{ + locks: make(map[string]*AdminLock), + } +} + +func (locks *AdminLocks) isLocked(lockName string) bool { + locks.RLock() + defer locks.RUnlock() + adminLock, found := locks.locks[lockName] + if !found { + return false + } + return adminLock.accessLockTime.Add(LockDuration).After(time.Now()) +} + +func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool { + locks.RLock() + defer locks.RUnlock() + adminLock, found := locks.locks[lockName] + if !found { + return false + } + return adminLock.accessLockTime.Equal(ts) && adminLock.accessSecret == token +} + +func (locks *AdminLocks) generateToken(lockName string) (ts time.Time, token int64) { + locks.Lock() + defer locks.Unlock() + lock := &AdminLock{ + accessSecret: rand.Int63(), + accessLockTime: time.Now(), + } + locks.locks[lockName] = lock + return lock.accessLockTime, lock.accessSecret +} + +func (locks *AdminLocks) deleteLock(lockName string) { + locks.Lock() + defer locks.Unlock() + delete(locks.locks, lockName) +} + +func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) { + resp := &master_pb.LeaseAdminTokenResponse{} + + if ms.adminLocks.isLocked(req.LockName) { + if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) { + // for renew + ts, token := ms.adminLocks.generateToken(req.LockName) + resp.Token, resp.LockTsNs = token, ts.UnixNano() + return resp, nil + } + // refuse since still locked + return resp, fmt.Errorf("already locked") + } + // for fresh lease request + ts, token := ms.adminLocks.generateToken(req.LockName) + resp.Token, resp.LockTsNs = token, ts.UnixNano() + return resp, nil +} + +func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.ReleaseAdminTokenRequest) (*master_pb.ReleaseAdminTokenResponse, error) { + resp := &master_pb.ReleaseAdminTokenResponse{} + if ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) { + ms.adminLocks.deleteLock(req.LockName) + } + return resp, nil +} diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go index f02b0f242..b92d6bcbe 100644 --- a/weed/server/master_grpc_server_collection.go +++ b/weed/server/master_grpc_server_collection.go @@ -4,6 +4,7 @@ import ( "context" "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -57,8 +58,8 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{ + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) return deleteErr @@ -77,8 +78,8 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error { listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName) for _, server := range listOfEcServers { - err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{ + err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) return deleteErr diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 856c07890..282c75679 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -121,8 +121,10 @@ func (ms *MasterServer) Statistics(ctx context.Context, req *master_pb.Statistic volumeLayout := ms.Topo.GetVolumeLayout(req.Collection, replicaPlacement, ttl) stats := volumeLayout.Stats() + totalSize := ms.Topo.GetMaxVolumeCount() * int64(ms.option.VolumeSizeLimitMB) * 1024 * 1024 + resp := &master_pb.StatisticsResponse{ - TotalSize: stats.TotalSize, + TotalSize: uint64(totalSize), UsedSize: stats.UsedSize, FileCount: stats.FileCount, } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index b3cc310e6..dc3ef9cfc 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "fmt" "net/http" "net/http/httputil" @@ -33,6 +32,7 @@ const ( ) type MasterOption struct { + Host string Port int MetaFolder string VolumeSizeLimitMB uint @@ -65,6 +65,8 @@ type MasterServer struct { grpcDialOption grpc.DialOption MasterClient *wdclient.MasterClient + + adminLocks *AdminLocks } func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { @@ -78,6 +80,9 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste v.SetDefault("jwt.signing.read.expires_after_seconds", 60) readExpiresAfterSec := v.GetInt("jwt.signing.read.expires_after_seconds") + v.SetDefault("master.replication.treat_replication_as_minimums", false) + replicationAsMin := v.GetBool("master.replication.treat_replication_as_minimums") + var preallocateSize int64 if option.VolumePreallocate { preallocateSize = int64(option.VolumeSizeLimitMB) * (1 << 20) @@ -89,7 +94,8 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste preallocateSize: preallocateSize, clientChans: make(map[string]chan *master_pb.VolumeLocation), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(context.Background(), grpcDialOption, "master", peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers), + adminLocks: NewAdminLocks(), } ms.bounedLeaderChan = make(chan int, 16) @@ -97,7 +103,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste if nil == seq { glog.Fatalf("create sequencer failed.") } - ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds) + ms.Topo = topology.NewTopology("topo", seq, uint64(ms.option.VolumeSizeLimitMB)*1024*1024, ms.option.PulseSeconds, replicationAsMin) ms.vg = topology.NewDefaultVolumeGrowth() glog.V(0).Infoln("Volume Size Limit is", ms.option.VolumeSizeLimitMB, "MB") @@ -115,9 +121,11 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste r.HandleFunc("/vol/status", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeStatusHandler))) r.HandleFunc("/vol/vacuum", ms.proxyToLeader(ms.guard.WhiteList(ms.volumeVacuumHandler))) r.HandleFunc("/submit", ms.guard.WhiteList(ms.submitFromMasterServerHandler)) - r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) - r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) - r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) + /* + r.HandleFunc("/stats/health", ms.guard.WhiteList(statsHealthHandler)) + r.HandleFunc("/stats/counter", ms.guard.WhiteList(statsCounterHandler)) + r.HandleFunc("/stats/memory", ms.guard.WhiteList(statsMemoryHandler)) + */ r.HandleFunc("/{fileId}", ms.redirectHandler) } @@ -148,7 +156,7 @@ func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { } } -func (ms *MasterServer) proxyToLeader(f func(w http.ResponseWriter, r *http.Request)) func(w http.ResponseWriter, r *http.Request) { +func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { f(w, r) @@ -193,10 +201,14 @@ func (ms *MasterServer) startAdminScripts() { v.SetDefault("master.maintenance.sleep_minutes", 17) sleepMinutes := v.GetInt("master.maintenance.sleep_minutes") - v.SetDefault("master.filer.default_filer_url", "http://localhost:8888/") - filerURL := v.GetString("master.filer.default_filer_url") + v.SetDefault("master.filer.default", "localhost:8888") + filerHostPort := v.GetString("master.filer.default") scriptLines := strings.Split(adminScripts, "\n") + if !strings.Contains(adminScripts, "lock") { + scriptLines = append(append([]string{}, "lock"), scriptLines...) + scriptLines = append(scriptLines, "unlock") + } masterAddress := "localhost:" + strconv.Itoa(ms.option.Port) @@ -204,9 +216,10 @@ func (ms *MasterServer) startAdminScripts() { shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") shellOptions.Masters = &masterAddress - shellOptions.FilerHost, shellOptions.FilerPort, shellOptions.Directory, err = util.ParseFilerUrl(filerURL) + shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort) + shellOptions.Directory = "/" if err != nil { - glog.V(0).Infof("failed to parse master.filer.default_filer_urll=%s : %v\n", filerURL, err) + glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err) return } @@ -220,27 +233,11 @@ func (ms *MasterServer) startAdminScripts() { commandEnv.MasterClient.WaitUntilConnected() c := time.Tick(time.Duration(sleepMinutes) * time.Minute) - for _ = range c { + for range c { if ms.Topo.IsLeader() { for _, line := range scriptLines { - - cmds := reg.FindAllString(line, -1) - if len(cmds) == 0 { - continue - } - args := make([]string, len(cmds[1:])) - for i := range args { - args[i] = strings.Trim(string(cmds[1+i]), "\"'") - } - cmd := strings.ToLower(cmds[0]) - - for _, c := range shell.Commands { - if c.Name() == cmd { - glog.V(0).Infof("executing: %s %v", cmd, args) - if err := c.Do(args, commandEnv, os.Stdout); err != nil { - glog.V(0).Infof("error: %v", err) - } - } + for _, c := range strings.Split(line, ";") { + processEachCmd(reg, c, commandEnv) } } } @@ -248,6 +245,27 @@ func (ms *MasterServer) startAdminScripts() { }() } +func processEachCmd(reg *regexp.Regexp, line string, commandEnv *shell.CommandEnv) { + cmds := reg.FindAllString(line, -1) + if len(cmds) == 0 { + return + } + args := make([]string, len(cmds[1:])) + for i := range args { + args[i] = strings.Trim(string(cmds[1+i]), "\"'") + } + cmd := strings.ToLower(cmds[0]) + + for _, c := range shell.Commands { + if c.Name() == cmd { + glog.V(0).Infof("executing: %s %v", cmd, args) + if err := c.Do(args, commandEnv, os.Stdout); err != nil { + glog.V(0).Infof("error: %v", err) + } + } + } +} + func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer { var seq sequence.Sequencer v := util.GetViper() diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 514d86800..ebcb7efd2 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -45,7 +45,7 @@ func (ms *MasterServer) dirLookupHandler(w http.ResponseWriter, r *http.Request) vid = fileId[0:commaSep] } } - collection := r.FormValue("collection") //optional, but can be faster if too many collections + collection := r.FormValue("collection") // optional, but can be faster if too many collections location := ms.findVolumeLocation(collection, vid) httpStatus := http.StatusOK if location.Error != "" || location.Locations == nil { @@ -72,9 +72,6 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo for _, loc := range machines { locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl}) } - if locations == nil { - err = fmt.Errorf("volume id %s not found", vid) - } } } else { machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid) @@ -83,6 +80,9 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo } err = getVidLocationsErr } + if len(locations) == 0 && err == nil { + err = fmt.Errorf("volume id %s not found", vid) + } ret := operation.LookupResult{ VolumeId: vid, Locations: locations, diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 44a04cb86..7595c0171 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -25,8 +25,8 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { - _, deleteErr := client.DeleteCollection(ctx, &volume_server_pb.DeleteCollectionRequest{ + err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) return deleteErr @@ -44,7 +44,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) - m["Version"] = util.VERSION + m["Version"] = util.Version() m["Topology"] = ms.Topo.ToMap() writeJsonQuiet(w, r, http.StatusOK, m) } @@ -61,7 +61,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque return } } - glog.Infoln("garbageThreshold =", gcThreshold) + // glog.Infoln("garbageThreshold =", gcThreshold) ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize) ms.dirStatusHandler(w, r) } @@ -93,7 +93,7 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request func (ms *MasterServer) volumeStatusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) - m["Version"] = util.VERSION + m["Version"] = util.Version() m["Volumes"] = ms.Topo.ToVolumeMap() writeJsonQuiet(w, r, http.StatusOK, m) } diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go index f241df87f..9cd58158b 100644 --- a/weed/server/master_server_handlers_ui.go +++ b/weed/server/master_server_handlers_ui.go @@ -2,6 +2,7 @@ package weed_server import ( "net/http" + "time" "github.com/chrislusf/raft" ui "github.com/chrislusf/seaweedfs/weed/server/master_ui" @@ -11,7 +12,7 @@ import ( func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { infos := make(map[string]interface{}) - infos["Version"] = util.VERSION + infos["Up Time"] = time.Now().Sub(startTime).String() args := struct { Version string Topology interface{} @@ -19,7 +20,7 @@ func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) Stats map[string]interface{} Counters *stats.ServerStats }{ - util.VERSION, + util.Version(), ms.Topo.ToMap(), ms.Topo.RaftServer, infos, diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index b674e3f82..7189064d0 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -76,6 +76,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> <th>Rack</th> <th>RemoteAddr</th> <th>#Volumes</th> + <th>Volume Ids</th> <th>#ErasureCodingShards</th> <th>Max</th> </tr> @@ -89,6 +90,7 @@ var StatusTpl = template.Must(template.New("status").Parse(`<!DOCTYPE html> <td>{{ $rack.Id }}</td> <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a></td> <td>{{ $dn.Volumes }}</td> + <td>{{ $dn.VolumeIds}}</td> <td>{{ $dn.EcShards }}</td> <td>{{ $dn.Max }}</td> </tr> diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 53289f1c1..0381c7feb 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,8 +2,6 @@ package weed_server import ( "encoding/json" - "github.com/chrislusf/seaweedfs/weed/util" - "google.golang.org/grpc" "io/ioutil" "os" "path" @@ -11,7 +9,12 @@ import ( "sort" "time" + "google.golang.org/grpc" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/raft" + "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" ) @@ -61,7 +64,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d s.raftServer.Start() for _, peer := range s.peers { - s.raftServer.AddPeer(peer, util.ServerToGrpcAddress(peer)) + s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)) } s.GrpcServer = raft.NewGrpcServer(s.raftServer) @@ -72,7 +75,7 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ Name: s.raftServer.Name(), - ConnectionString: util.ServerToGrpcAddress(s.serverAddr), + ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), }) if err != nil { diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 43987b748..27b21ac09 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,9 +3,11 @@ package weed_server import ( "context" "fmt" + "path/filepath" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/super_block" ) @@ -148,3 +150,19 @@ func (vs *VolumeServer) VolumeMarkReadonly(ctx context.Context, req *volume_serv return resp, err } + +func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) { + + resp := &volume_server_pb.VolumeServerStatusResponse{} + + for _, loc := range vs.store.Locations { + if dir, e := filepath.Abs(loc.Directory); e == nil { + resp.DiskStatuses = append(resp.DiskStatuses, stats.NewDiskStatus(dir)) + } + } + + resp.MemoryStatus = stats.MemStat() + + return resp, nil + +} diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go index fdb7937d2..501964191 100644 --- a/weed/server/volume_grpc_batch_delete.go +++ b/weed/server/volume_grpc_batch_delete.go @@ -8,6 +8,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" ) func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.BatchDeleteRequest) (*volume_server_pb.BatchDeleteResponse, error) { @@ -28,16 +29,34 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B n := new(needle.Needle) volumeId, _ := needle.NewVolumeId(vid) - n.ParsePath(id_cookie) - - cookie := n.Cookie - if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { - resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ - FileId: fid, - Status: http.StatusNotFound, - Error: err.Error(), - }) - continue + if req.SkipCookieCheck { + n.Id, err = types.ParseNeedleId(id_cookie) + if err != nil { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusBadRequest, + Error: err.Error()}) + continue + } + } else { + n.ParsePath(id_cookie) + cookie := n.Cookie + if _, err := vs.store.ReadVolumeNeedle(volumeId, n); err != nil { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusNotFound, + Error: err.Error(), + }) + continue + } + if n.Cookie != cookie { + resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ + FileId: fid, + Status: http.StatusBadRequest, + Error: "File Random Cookie does not match.", + }) + break + } } if n.IsChunkedManifest() { @@ -49,14 +68,6 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B continue } - if n.Cookie != cookie { - resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ - FileId: fid, - Status: http.StatusBadRequest, - Error: "File Random Cookie does not match.", - }) - break - } n.LastModified = now if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil { resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{ diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index dc47c2884..7cb836344 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -7,6 +7,7 @@ import ( "google.golang.org/grpc" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/backend" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" @@ -34,15 +35,18 @@ func (vs *VolumeServer) heartbeat() { for { for _, master := range vs.SeedMasterNodes { if newLeader != "" { + // the new leader may actually is the same master + // need to wait a bit before adding itself + time.Sleep(3 * time.Second) master = newLeader } - masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(master) + masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master) if parseErr != nil { glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr) continue } vs.store.MasterAddress = master - newLeader, err = vs.doHeartbeat(context.Background(), master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) + newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) if err != nil { glog.V(0).Infof("heartbeat error: %v", err) time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) @@ -53,16 +57,16 @@ func (vs *VolumeServer) heartbeat() { } } -func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { +func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { - grpcConection, err := util.GrpcDial(ctx, masterGrpcAddress, grpcDialOption) + grpcConection, err := pb.GrpcDial(context.Background(), masterGrpcAddress, grpcDialOption) if err != nil { return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) - stream, err := client.SendHeartbeat(ctx) + stream, err := client.SendHeartbeat(context.Background()) if err != nil { glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) return "", err @@ -79,8 +83,13 @@ func (vs *VolumeServer) doHeartbeat(ctx context.Context, masterNode, masterGrpcA doneChan <- err return } - if in.GetVolumeSizeLimit() != 0 { + if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() { vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) + if vs.store.MaybeAdjustVolumeMax() { + if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + } + } } if in.GetLeader() != "" && masterNode != in.GetLeader() && !isSameIP(in.GetLeader(), masterNode) { glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), masterNode) diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 6d74f8171..5c7d5572c 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -6,7 +6,6 @@ import ( "io" "math" "os" - "path" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -25,7 +24,20 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v != nil { - return nil, fmt.Errorf("volume %d already exists", req.VolumeId) + + glog.V(0).Infof("volume %d already exists. deleted before copying...", req.VolumeId) + + err := vs.store.UnmountVolume(needle.VolumeId(req.VolumeId)) + if err != nil { + return nil, fmt.Errorf("failed to mount existing volume %d: %v", req.VolumeId, err) + } + + err = vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) + if err != nil { + return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err) + } + + glog.V(0).Infof("deleted exisitng volume %d before copying.", req.VolumeId) } location := vs.store.FindFreeLocation() @@ -41,9 +53,9 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // confirm size and timestamp var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse var volumeFileName, idxFileName, datFileName string - err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { var err error - volFileInfoResp, err = client.ReadVolumeFileStatus(ctx, + volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(), &volume_server_pb.ReadVolumeFileStatusRequest{ VolumeId: req.VolumeId, }) @@ -55,30 +67,38 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // println("source:", volFileInfoResp.String()) // copy ecx file - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, volumeFileName, ".idx", false, false); err != nil { return err } - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".dat", false, true); err != nil { return err } - if err := vs.doCopyFile(ctx, client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil { + if err := vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, volumeFileName, ".vif", false, true); err != nil { return err } return nil }) + if err != nil { + return nil, err + } + if volumeFileName == "" { + return nil, fmt.Errorf("not found volume %d file", req.VolumeId) + } + idxFileName = volumeFileName + ".idx" datFileName = volumeFileName + ".dat" - if err != nil && volumeFileName != "" { - os.Remove(idxFileName) - os.Remove(datFileName) - os.Remove(volumeFileName + ".vif") - return nil, err - } + defer func() { + if err != nil && volumeFileName != "" { + os.Remove(idxFileName) + os.Remove(datFileName) + os.Remove(volumeFileName + ".vif") + } + }() if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16 return nil, err @@ -95,10 +115,9 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }, err } -func (vs *VolumeServer) doCopyFile(ctx context.Context, client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid uint32, - compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend bool, ignoreSourceFileNotFound bool) error { +func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error { - copyFileClient, err := client.CopyFile(ctx, &volume_server_pb.CopyFileRequest{ + copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: vid, Ext: ext, CompactionRevision: compactRevision, @@ -209,7 +228,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v } else { baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext for _, location := range vs.store.Locations { - tName := path.Join(location.Directory, baseFileName) + tName := util.Join(location.Directory, baseFileName) if util.FileExists(tName) { fileName = tName } diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 256e7c447..66dd5bf8d 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -106,11 +106,11 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv baseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) - err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(ctx context.Context, client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { return err } } @@ -118,7 +118,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcxFile { // copy ecx file - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecx", false, false); err != nil { return err } return nil @@ -126,14 +126,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcjFile { // copy ecj file - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".ecj", true, true); err != nil { return err } } if req.CopyVifFile { // copy vif file - if err := vs.doCopyFile(ctx, client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil { + if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, baseFileName, ".vif", false, true); err != nil { return err } } diff --git a/weed/server/volume_grpc_file.go b/weed/server/volume_grpc_file.go new file mode 100644 index 000000000..4d71ddeb1 --- /dev/null +++ b/weed/server/volume_grpc_file.go @@ -0,0 +1,129 @@ +package weed_server + +import ( + "encoding/json" + "net/http" + "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (vs *VolumeServer) FileGet(req *volume_server_pb.FileGetRequest, stream volume_server_pb.VolumeServer_FileGetServer) error { + + headResponse := &volume_server_pb.FileGetResponse{} + n := new(needle.Needle) + + commaIndex := strings.LastIndex(req.FileId, ",") + vid := req.FileId[:commaIndex] + fid := req.FileId[commaIndex+1:] + + volumeId, err := needle.NewVolumeId(vid) + if err != nil { + headResponse.ErrorCode = http.StatusBadRequest + return stream.Send(headResponse) + } + err = n.ParsePath(fid) + if err != nil { + headResponse.ErrorCode = http.StatusBadRequest + return stream.Send(headResponse) + } + + hasVolume := vs.store.HasVolume(volumeId) + _, hasEcVolume := vs.store.FindEcVolume(volumeId) + + if !hasVolume && !hasEcVolume { + headResponse.ErrorCode = http.StatusMovedPermanently + return stream.Send(headResponse) + } + + cookie := n.Cookie + var count int + if hasVolume { + count, err = vs.store.ReadVolumeNeedle(volumeId, n) + } else if hasEcVolume { + count, err = vs.store.ReadEcShardNeedle(volumeId, n) + } + + if err != nil || count < 0 { + headResponse.ErrorCode = http.StatusNotFound + return stream.Send(headResponse) + } + if n.Cookie != cookie { + headResponse.ErrorCode = http.StatusNotFound + return stream.Send(headResponse) + } + + if n.LastModified != 0 { + headResponse.LastModified = n.LastModified + } + + headResponse.Etag = n.Etag() + + if n.HasPairs() { + pairMap := make(map[string]string) + err = json.Unmarshal(n.Pairs, &pairMap) + if err != nil { + glog.V(0).Infoln("Unmarshal pairs error:", err) + } + headResponse.Headers = pairMap + } + + /* + // skip this, no redirection + if vs.tryHandleChunkedFile(n, filename, w, r) { + return + } + */ + + if n.NameSize > 0 { + headResponse.Filename = string(n.Name) + } + mtype := "" + if n.MimeSize > 0 { + mt := string(n.Mime) + if !strings.HasPrefix(mt, "application/octet-stream") { + mtype = mt + } + } + headResponse.ContentType = mtype + + headResponse.IsGzipped = n.IsGzipped() + + if n.IsGzipped() && req.AcceptGzip { + if n.Data, err = util.UnGzipData(n.Data); err != nil { + glog.V(0).Infof("ungzip %s error: %v", req.FileId, err) + } + } + + headResponse.ContentLength = uint32(len(n.Data)) + bytesToRead := len(n.Data) + bytesRead := 0 + + t := headResponse + + for bytesRead < bytesToRead { + + stopIndex := bytesRead + BufferSizeLimit + if stopIndex > bytesToRead { + stopIndex = bytesToRead + } + + if t == nil { + t = &volume_server_pb.FileGetResponse{} + } + t.Data = n.Data[bytesRead:stopIndex] + + err = stream.Send(t) + t = nil + if err != nil { + return err + } + + bytesRead = stopIndex + } + + return nil +} diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index c26d6ed8f..2dde5b69c 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -90,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { - _, _, err := vs.store.WriteVolumeNeedle(v.Id, n) + _, err := vs.store.WriteVolumeNeedle(v.Id, n, false) return err }) diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index 24f982241..b87de4b5b 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -51,6 +51,11 @@ func (vs *VolumeServer) VacuumVolumeCommit(ctx context.Context, req *volume_serv } else { glog.V(1).Infof("commit volume %d", req.VolumeId) } + if err == nil { + if vs.store.GetVolume(needle.VolumeId(req.VolumeId)).IsReadOnly() { + resp.IsReadOnly = true + } + } return resp, err diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 0fdcf662a..2d716edc1 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -77,9 +77,11 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, // only expose the volume server details for safe environments adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) adminMux.HandleFunc("/status", vs.guard.WhiteList(vs.statusHandler)) - adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) - adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) - adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) + /* + adminMux.HandleFunc("/stats/counter", vs.guard.WhiteList(statsCounterHandler)) + adminMux.HandleFunc("/stats/memory", vs.guard.WhiteList(statsMemoryHandler)) + adminMux.HandleFunc("/stats/disk", vs.guard.WhiteList(vs.statsDiskHandler)) + */ } adminMux.HandleFunc("/", vs.privateStoreHandler) if publicMux != adminMux { diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 1938a34c4..34655d833 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -11,14 +11,21 @@ import ( func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) - m["Version"] = util.VERSION + m["Version"] = util.Version() + var ds []*volume_server_pb.DiskStatus + for _, loc := range vs.store.Locations { + if dir, e := filepath.Abs(loc.Directory); e == nil { + ds = append(ds, stats.NewDiskStatus(dir)) + } + } + m["DiskStatuses"] = ds m["Volumes"] = vs.store.VolumeInfos() writeJsonQuiet(w, r, http.StatusOK, m) } func (vs *VolumeServer) statsDiskHandler(w http.ResponseWriter, r *http.Request) { m := make(map[string]interface{}) - m["Version"] = util.VERSION + m["Version"] = util.Version() var ds []*volume_server_pb.DiskStatus for _, loc := range vs.store.Locations { if dir, e := filepath.Abs(loc.Directory); e == nil { diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index d89d13a0d..19b459136 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -2,21 +2,18 @@ package weed_server import ( "bytes" - "context" + "encoding/json" "errors" "fmt" "io" "mime" - "mime/multipart" "net/http" "net/url" - "path" + "path/filepath" "strconv" "strings" "time" - "encoding/json" - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/operation" @@ -43,13 +40,13 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) volumeId, err := needle.NewVolumeId(vid) if err != nil { - glog.V(2).Infoln("parsing error:", err, r.URL.Path) + glog.V(2).Infof("parsing vid %s: %v", r.URL.Path, err) w.WriteHeader(http.StatusBadRequest) return } err = n.ParsePath(fid) if err != nil { - glog.V(2).Infoln("parsing fid error:", err, r.URL.Path) + glog.V(2).Infof("parsing fid %s: %v", r.URL.Path, err) w.WriteHeader(http.StatusBadRequest) return } @@ -86,7 +83,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if hasVolume { count, err = vs.store.ReadVolumeNeedle(volumeId, n) } else if hasEcVolume { - count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n) + count, err = vs.store.ReadEcShardNeedle(volumeId, n) } // glog.V(4).Infoln("read bytes", count, "error", err) if err != nil || count < 0 { @@ -114,11 +111,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.WriteHeader(http.StatusNotModified) return } - if r.Header.Get("ETag-MD5") == "True" { - setEtag(w, n.MD5()) - } else { - setEtag(w, n.Etag()) - } + setEtag(w, n.Etag()) if n.HasPairs() { pairMap := make(map[string]string) @@ -131,14 +124,14 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - if vs.tryHandleChunkedFile(n, filename, w, r) { + if vs.tryHandleChunkedFile(n, filename, ext, w, r) { return } if n.NameSize > 0 && filename == "" { filename = string(n.Name) if ext == "" { - ext = path.Ext(filename) + ext = filepath.Ext(filename) } } mtype := "" @@ -152,7 +145,13 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if ext != ".gz" { if n.IsGzipped() { if strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - w.Header().Set("Content-Encoding", "gzip") + if _, _, _, shouldResize := shouldResizeImages(ext, r); shouldResize { + if n.Data, err = util.UnGzipData(n.Data); err != nil { + glog.V(0).Infoln("ungzip error:", err, r.URL.Path) + } + } else { + w.Header().Set("Content-Encoding", "gzip") + } } else { if n.Data, err = util.UnGzipData(n.Data); err != nil { glog.V(0).Infoln("ungzip error:", err, r.URL.Path) @@ -168,7 +167,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } -func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, w http.ResponseWriter, r *http.Request) (processed bool) { +func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, ext string, w http.ResponseWriter, r *http.Request) (processed bool) { if !n.IsChunkedManifest() || r.URL.Query().Get("cm") == "false" { return false } @@ -182,7 +181,9 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, fileName = chunkManifest.Name } - ext := path.Ext(fileName) + if ext == "" { + ext = filepath.Ext(fileName) + } mType := "" if chunkManifest.Mime != "" { @@ -194,10 +195,7 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, w.Header().Set("X-File-Store", "chunked") - chunkedFileReader := &operation.ChunkedFileReader{ - Manifest: chunkManifest, - Master: vs.GetMaster(), - } + chunkedFileReader := operation.NewChunkedFileReader(chunkManifest.Chunks, vs.GetMaster()) defer chunkedFileReader.Close() rs := conditionallyResizeImages(chunkedFileReader, ext, r) @@ -210,132 +208,56 @@ func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, func conditionallyResizeImages(originalDataReaderSeeker io.ReadSeeker, ext string, r *http.Request) io.ReadSeeker { rs := originalDataReaderSeeker + + width, height, mode, shouldResize := shouldResizeImages(ext, r) + if shouldResize { + rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, mode) + } + return rs +} + +func shouldResizeImages(ext string, r *http.Request) (width, height int, mode string, shouldResize bool) { if len(ext) > 0 { ext = strings.ToLower(ext) } if ext == ".png" || ext == ".jpg" || ext == ".jpeg" || ext == ".gif" { - width, height := 0, 0 if r.FormValue("width") != "" { width, _ = strconv.Atoi(r.FormValue("width")) } if r.FormValue("height") != "" { height, _ = strconv.Atoi(r.FormValue("height")) } - rs, _, _ = images.Resized(ext, originalDataReaderSeeker, width, height, r.FormValue("mode")) } - return rs + mode = r.FormValue("mode") + shouldResize = width > 0 || height > 0 + return } func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.ResponseWriter, r *http.Request) error { totalSize, e := rs.Seek(0, 2) if mimeType == "" { - if ext := path.Ext(filename); ext != "" { + if ext := filepath.Ext(filename); ext != "" { mimeType = mime.TypeByExtension(ext) } } if mimeType != "" { w.Header().Set("Content-Type", mimeType) } - if filename != "" { - contentDisposition := "inline" - if r.FormValue("dl") != "" { - if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl { - contentDisposition = "attachment" - } - } - w.Header().Set("Content-Disposition", contentDisposition+`; filename="`+fileNameEscaper.Replace(filename)+`"`) - } w.Header().Set("Accept-Ranges", "bytes") + if r.Method == "HEAD" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) return nil } - rangeReq := r.Header.Get("Range") - if rangeReq == "" { - w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if _, e = rs.Seek(0, 0); e != nil { - return e - } - _, e = io.Copy(w, rs) - return e - } - //the rest is dealing with partial content request - //mostly copy from src/pkg/net/http/fs.go - ranges, err := parseRange(rangeReq, totalSize) - if err != nil { - http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) - return nil - } - if sumRangesSize(ranges) > totalSize { - // The total number of bytes in all the ranges - // is larger than the size of the file by - // itself, so this is probably an attack, or a - // dumb client. Ignore the range request. - return nil - } - if len(ranges) == 0 { - return nil - } - if len(ranges) == 1 { - // RFC 2616, Section 14.16: - // "When an HTTP message includes the content of a single - // range (for example, a response to a request for a - // single range, or to a request for a set of ranges - // that overlap without any holes), this content is - // transmitted with a Content-Range header, and a - // Content-Length header showing the number of bytes - // actually transferred. - // ... - // A response to a request for a single range MUST NOT - // be sent using the multipart/byteranges media type." - ra := ranges[0] - w.Header().Set("Content-Length", strconv.FormatInt(ra.length, 10)) - w.Header().Set("Content-Range", ra.contentRange(totalSize)) - w.WriteHeader(http.StatusPartialContent) - if _, e = rs.Seek(ra.start, 0); e != nil { + adjustHeadersAfterHEAD(w, r, filename) + + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + if _, e = rs.Seek(offset, 0); e != nil { return e } - - _, e = io.CopyN(w, rs, ra.length) + _, e = io.CopyN(writer, rs, size) return e - } - // process multiple ranges - for _, ra := range ranges { - if ra.start > totalSize { - http.Error(w, "Out of Range", http.StatusRequestedRangeNotSatisfiable) - return nil - } - } - sendSize := rangesMIMESize(ranges, mimeType, totalSize) - pr, pw := io.Pipe() - mw := multipart.NewWriter(pw) - w.Header().Set("Content-Type", "multipart/byteranges; boundary="+mw.Boundary()) - sendContent := pr - defer pr.Close() // cause writing goroutine to fail and exit if CopyN doesn't finish. - go func() { - for _, ra := range ranges { - part, e := mw.CreatePart(ra.mimeHeader(mimeType, totalSize)) - if e != nil { - pw.CloseWithError(e) - return - } - if _, e = rs.Seek(ra.start, 0); e != nil { - pw.CloseWithError(e) - return - } - if _, e = io.CopyN(part, rs, ra.length); e != nil { - pw.CloseWithError(e) - return - } - } - mw.Close() - pw.Close() - }() - if w.Header().Get("Content-Encoding") == "" { - w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) - } - w.WriteHeader(http.StatusPartialContent) - _, e = io.CopyN(w, sendContent, sendSize) - return e + }) + return nil } diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index 8d35c9c8b..8b2027e7b 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -40,7 +40,7 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) Stats interface{} Counters *stats.ServerStats }{ - util.VERSION, + util.Version(), vs.SeedMasterNodes, normalVolumeInfos, vs.store.EcVolumes(), diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go index cd35255e5..9a00dcc29 100644 --- a/weed/server/volume_server_handlers_write.go +++ b/weed/server/volume_server_handlers_write.go @@ -1,7 +1,6 @@ package weed_server import ( - "context" "errors" "fmt" "net/http" @@ -43,18 +42,19 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { return } - needle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) + reqNeedle, originalSize, ne := needle.CreateNeedleFromRequest(r, vs.FixJpgOrientation, vs.fileSizeLimitBytes) if ne != nil { writeJsonError(w, r, http.StatusBadRequest, ne) return } ret := operation.UploadResult{} - _, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r) + isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, reqNeedle, r) - // http 304 status code does not allow body + // http 204 status code does not allow body if writeError == nil && isUnchanged { - w.WriteHeader(http.StatusNotModified) + setEtag(w, reqNeedle.Etag()) + w.WriteHeader(http.StatusNoContent) return } @@ -63,11 +63,12 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) { httpStatus = http.StatusInternalServerError ret.Error = writeError.Error() } - if needle.HasName() { - ret.Name = string(needle.Name) + if reqNeedle.HasName() { + ret.Name = string(reqNeedle.Name) } ret.Size = uint32(originalSize) - ret.ETag = needle.Etag() + ret.ETag = reqNeedle.Etag() + ret.Mime = string(reqNeedle.Mime) setEtag(w, ret.ETag) writeJsonQuiet(w, r, httpStatus, ret) } @@ -97,7 +98,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { ecVolume, hasEcVolume := vs.store.FindEcVolume(volumeId) if hasEcVolume { - count, err := vs.store.DeleteEcShardNeedle(context.Background(), ecVolume, n, cookie) + count, err := vs.store.DeleteEcShardNeedle(ecVolume, n, cookie) writeDeleteResult(err, count, w, r) return } @@ -125,7 +126,7 @@ func (vs *VolumeServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { return } // make sure all chunks had deleted before delete manifest - if e := chunkManifest.DeleteChunks(vs.GetMaster(), vs.grpcDialOption); e != nil { + if e := chunkManifest.DeleteChunks(vs.GetMaster(), false, vs.grpcDialOption); e != nil { writeJsonError(w, r, http.StatusInternalServerError, fmt.Errorf("Delete chunks error: %v", e)) return } @@ -165,3 +166,11 @@ func setEtag(w http.ResponseWriter, etag string) { } } } + +func getEtag(resp *http.Response) (etag string) { + etag = resp.Header.Get("ETag") + if strings.HasPrefix(etag, "\"") && strings.HasSuffix(etag, "\"") { + return etag[1 : len(etag)-1] + } + return +} diff --git a/weed/server/volume_server_ui/templates.go b/weed/server/volume_server_ui/templates.go index 81496b1de..8705bc088 100644 --- a/weed/server/volume_server_ui/templates.go +++ b/weed/server/volume_server_ui/templates.go @@ -1,11 +1,17 @@ package master_ui import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/util" "html/template" "strconv" "strings" ) +func percentFrom(total uint64, part_of uint64) string { + return fmt.Sprintf("%.2f", (float64(part_of)/float64(total))*100) +} + func join(data []int64) string { var ret []string for _, d := range data { @@ -15,7 +21,9 @@ func join(data []int64) string { } var funcMap = template.FuncMap{ - "join": join, + "join": join, + "bytesToHumanReadable": util.BytesToHumanReadable, + "percentFrom": percentFrom, } var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOCTYPE html> @@ -57,13 +65,25 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <div class="row"> <div class="col-sm-6"> <h2>Disk Stats</h2> - <table class="table table-condensed table-striped"> + <table class="table table-striped"> + <thead> + <tr> + <th>Path</th> + <th>Total</th> + <th>Free</th> + <th>Usage</th> + </tr> + </thead> + <tbody> {{ range .DiskStatuses }} <tr> - <th>{{ .Dir }}</th> - <td>{{ .Free }} Bytes Free</td> + <td>{{ .Dir }}</td> + <td>{{ bytesToHumanReadable .All }}</td> + <td>{{ bytesToHumanReadable .Free }}</td> + <td>{{ percentFrom .All .Used}}%</td> </tr> {{ end }} + </tbody> </table> </div> @@ -119,9 +139,9 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <tr> <td><code>{{ .Id }}</code></td> <td>{{ .Collection }}</td> - <td>{{ .Size }} Bytes</td> + <td>{{ bytesToHumanReadable .Size }}</td> <td>{{ .FileCount }}</td> - <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td> + <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td> <td>{{ .Ttl }}</td> <td>{{ .ReadOnly }}</td> </tr> @@ -149,9 +169,9 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <tr> <td><code>{{ .Id }}</code></td> <td>{{ .Collection }}</td> - <td>{{ .Size }} Bytes</td> + <td>{{ bytesToHumanReadable .Size }}</td> <td>{{ .FileCount }}</td> - <td>{{ .DeleteCount }} / {{.DeletedByteCount}} Bytes</td> + <td>{{ .DeleteCount }} / {{bytesToHumanReadable .DeletedByteCount}}</td> <td>{{ .RemoteStorageName }}</td> <td>{{ .RemoteStorageKey }}</td> </tr> @@ -177,7 +197,7 @@ var StatusTpl = template.Must(template.New("status").Funcs(funcMap).Parse(`<!DOC <tr> <td><code>{{ .VolumeId }}</code></td> <td>{{ .Collection }}</td> - <td>{{ .ShardSize }} Bytes</td> + <td>{{ bytesToHumanReadable .ShardSize }}</td> <td>{{ .ShardIdList }}</td> <td>{{ .CreatedAt.Format "02 Jan 06 15:04 -0700" }}</td> </tr> diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index d75869f30..37c4afd5c 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -1,21 +1,24 @@ package weed_server import ( - "bytes" "context" "fmt" "io" + "math" "os" "path" "strings" "time" + "github.com/chrislusf/seaweedfs/weed/util/grace" "golang.org/x/net/webdav" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/chunk_cache" "github.com/chrislusf/seaweedfs/weed/filer2" "github.com/chrislusf/seaweedfs/weed/glog" @@ -31,6 +34,9 @@ type WebDavOption struct { Collection string Uid uint32 Gid uint32 + Cipher bool + CacheDir string + CacheSizeMB int64 } type WebDavServer struct { @@ -64,6 +70,7 @@ type WebDavFileSystem struct { secret security.SigningKey filer *filer2.Filer grpcDialOption grpc.DialOption + chunkCache *chunk_cache.ChunkCache } type FileInfo struct { @@ -88,22 +95,34 @@ type WebDavFile struct { off int64 entry *filer_pb.Entry entryViewCache []filer2.VisibleInterval + reader io.ReaderAt } func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { + + chunkCache := chunk_cache.NewChunkCache(256, option.CacheDir, option.CacheSizeMB) + grace.OnInterrupt(func() { + chunkCache.Shutdown() + }) return &WebDavFileSystem{ - option: option, + option: option, + chunkCache: chunkCache, }, nil } -func (fs *WebDavFileSystem) WithFilerClient(ctx context.Context, fn func(context.Context, filer_pb.SeaweedFilerClient) error) error { +var _ = filer_pb.FilerClient(&WebDavFileSystem{}) - return util.WithCachedGrpcClient(ctx, func(ctx2 context.Context, grpcConnection *grpc.ClientConn) error { +func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { + + return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) - return fn(ctx2, client) + return fn(client) }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) } +func (fs *WebDavFileSystem) AdjustedUrl(hostAndPort string) string { + return hostAndPort +} func clearName(name string) (string, error) { slashed := strings.HasSuffix(name, "/") @@ -135,8 +154,8 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm return os.ErrExist } - return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - dir, name := filer2.FullPath(fullDirPath).DirAndName() + return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + dir, name := util.FullPath(fullDirPath).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ @@ -153,7 +172,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm } glog.V(1).Infof("mkdir: %v", request) - if err := filer_pb.CreateEntry(ctx, client, request); err != nil { + if err := filer_pb.CreateEntry(client, request); err != nil { return fmt.Errorf("mkdir %s/%s: %v", dir, name, err) } @@ -183,9 +202,9 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f fs.removeAll(ctx, fullFilePath) } - dir, name := filer2.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - if err := filer_pb.CreateEntry(ctx, client, &filer_pb.CreateEntryRequest{ + dir, name := util.FullPath(fullFilePath).DirAndName() + err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ Name: name, @@ -238,34 +257,10 @@ func (fs *WebDavFileSystem) removeAll(ctx context.Context, fullFilePath string) return err } - fi, err := fs.stat(ctx, fullFilePath) - if err != nil { - return err - } - - if fi.IsDir() { - //_, err = fs.db.Exec(`delete from filesystem where fullFilePath like $1 escape '\'`, strings.Replace(fullFilePath, `%`, `\%`, -1)+`%`) - } else { - //_, err = fs.db.Exec(`delete from filesystem where fullFilePath = ?`, fullFilePath) - } - dir, name := filer2.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { - - request := &filer_pb.DeleteEntryRequest{ - Directory: dir, - Name: name, - IsDeleteData: true, - } + dir, name := util.FullPath(fullFilePath).DirAndName() - glog.V(3).Infof("removing entry: %v", request) - _, err := client.DeleteEntry(ctx, request) - if err != nil { - return fmt.Errorf("remove %s: %v", fullFilePath, err) - } + return filer_pb.Remove(fs, dir, name, true, false, false) - return nil - }) - return err } func (fs *WebDavFileSystem) RemoveAll(ctx context.Context, name string) error { @@ -305,10 +300,10 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) return os.ErrExist } - oldDir, oldBaseName := filer2.FullPath(oldName).DirAndName() - newDir, newBaseName := filer2.FullPath(newName).DirAndName() + oldDir, oldBaseName := util.FullPath(oldName).DirAndName() + newDir, newBaseName := util.FullPath(newName).DirAndName() - return fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: oldDir, @@ -333,10 +328,10 @@ func (fs *WebDavFileSystem) stat(ctx context.Context, fullFilePath string) (os.F return nil, err } - fullpath := filer2.FullPath(fullFilePath) + fullpath := util.FullPath(fullFilePath) var fi FileInfo - entry, err := filer2.GetEntry(ctx, fs, fullpath) + entry, err := filer_pb.GetEntry(fs, fullpath) if entry == nil { return nil, os.ErrNotExist } @@ -367,10 +362,12 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { glog.V(2).Infof("WebDavFileSystem.Write %v", f.name) + dir, _ := util.FullPath(f.name).DirAndName() + var err error ctx := context.Background() if f.entry == nil { - f.entry, err = filer2.GetEntry(ctx, f.fs, filer2.FullPath(f.name)) + f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name)) } if f.entry == nil { @@ -382,13 +379,15 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { var fileId, host string var auth security.EncodedJwt + var collection, replication string - if err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + if err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AssignVolumeRequest{ Count: 1, - Replication: "000", + Replication: "", Collection: f.fs.option.Collection, + ParentPath: dir, } resp, err := client.AssignVolume(ctx, request) @@ -396,8 +395,12 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { glog.V(0).Infof("assign volume failure %v: %v", request, err) return err } + if resp.Error != "" { + return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) + } fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + collection, replication = resp.Collection, resp.Replication return nil }); err != nil { @@ -405,8 +408,7 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - bufReader := bytes.NewReader(buf) - uploadResult, err := operation.Upload(fileUrl, f.name, bufReader, false, "", nil, auth) + uploadResult, err := operation.UploadData(fileUrl, f.name, f.fs.option.Cipher, buf, false, "", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, err) return 0, fmt.Errorf("upload data: %v", err) @@ -416,19 +418,12 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { return 0, fmt.Errorf("upload result: %v", uploadResult.Error) } - chunk := &filer_pb.FileChunk{ - FileId: fileId, - Offset: f.off, - Size: uint64(len(buf)), - Mtime: time.Now().UnixNano(), - ETag: uploadResult.ETag, - } - - f.entry.Chunks = append(f.entry.Chunks, chunk) - dir, _ := filer2.FullPath(f.name).DirAndName() + f.entry.Chunks = append(f.entry.Chunks, uploadResult.ToPbFileChunk(fileId, f.off)) - err = f.fs.WithFilerClient(ctx, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + err = f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { f.entry.Attributes.Mtime = time.Now().Unix() + f.entry.Attributes.Collection = collection + f.entry.Attributes.Replication = replication request := &filer_pb.UpdateEntryRequest{ Directory: dir, @@ -465,10 +460,9 @@ func (f *WebDavFile) Close() error { func (f *WebDavFile) Read(p []byte) (readSize int, err error) { glog.V(2).Infof("WebDavFileSystem.Read %v", f.name) - ctx := context.Background() if f.entry == nil { - f.entry, err = filer2.GetEntry(ctx, f.fs, filer2.FullPath(f.name)) + f.entry, err = filer_pb.GetEntry(f.fs, util.FullPath(f.name)) } if f.entry == nil { return 0, err @@ -481,33 +475,33 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { } if f.entryViewCache == nil { f.entryViewCache = filer2.NonOverlappingVisibleIntervals(f.entry.Chunks) + f.reader = nil } - chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, f.off, len(p)) - - totalRead, err := filer2.ReadIntoBuffer(ctx, f.fs, filer2.FullPath(f.name), p, chunkViews, f.off) - if err != nil { - return 0, err + if f.reader == nil { + chunkViews := filer2.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt32) + f.reader = filer2.NewChunkReaderAtFromClient(f.fs, chunkViews, f.fs.chunkCache) } - readSize = int(totalRead) - glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+totalRead) + readSize, err = f.reader.ReadAt(p, f.off) - f.off += totalRead - if readSize == 0 { - return 0, io.EOF + glog.V(3).Infof("WebDavFileSystem.Read %v: [%d,%d)", f.name, f.off, f.off+int64(readSize)) + f.off += int64(readSize) + + if err != nil { + glog.Errorf("file read %s: %v", f.name, err) } return + } func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { glog.V(2).Infof("WebDavFileSystem.Readdir %v count %d", f.name, count) - ctx := context.Background() - dir, _ := filer2.FullPath(f.name).DirAndName() + dir, _ := util.FullPath(f.name).DirAndName() - err = filer2.ReadDirAllEntries(ctx, f.fs, filer2.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) { + err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error { fi := FileInfo{ size: int64(filer2.TotalSize(entry.GetChunks())), name: entry.Name, @@ -521,6 +515,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) { } glog.V(4).Infof("entry: %v", fi.name) ret = append(ret, &fi) + return nil }) old := f.off |
