diff options
| author | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
|---|---|---|
| committer | yulai.li <blacktear23@gmail.com> | 2022-06-26 22:43:37 +0800 |
| commit | 46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch) | |
| tree | 734125b48b6d96f8796a2b89b924312cd169ef0e /weed/server | |
| parent | a5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff) | |
| parent | dc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff) | |
| download | seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip | |
Update tikv client version and add one PC support
Diffstat (limited to 'weed/server')
51 files changed, 2563 insertions, 784 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index ad3842190..f02ec67ac 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -1,14 +1,17 @@ package weed_server import ( + "bufio" "bytes" "encoding/json" "errors" "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "io" "io/fs" "mime/multipart" "net/http" + "net/url" "path/filepath" "strconv" "strings" @@ -161,7 +164,16 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope } debug("upload file to store", url) - uploadResult, err := operation.UploadData(url, pu.FileName, false, pu.Data, pu.IsGzipped, pu.MimeType, pu.PairMap, assignResult.Auth) + uploadOption := &operation.UploadOption{ + UploadUrl: url, + Filename: pu.FileName, + Cipher: false, + IsInputCompressed: pu.IsGzipped, + MimeType: pu.MimeType, + PairMap: pu.PairMap, + Jwt: assignResult.Auth, + } + uploadResult, err := operation.UploadData(pu.Data, uploadOption) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return @@ -240,8 +252,20 @@ func handleStaticResources2(r *mux.Router) { r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(http.FS(StaticFS)))) } +func adjustPassthroughHeaders(w http.ResponseWriter, r *http.Request, filename string) { + for header, values := range r.Header { + if normalizedHeader, ok := s3_constants.PassThroughHeaders[strings.ToLower(header)]; ok { + w.Header()[normalizedHeader] = values + } + } + adjustHeaderContentDisposition(w, r, filename) +} func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) { + if contentDisposition := w.Header().Get("Content-Disposition"); contentDisposition != "" { + return + } if filename != "" { + filename = url.QueryEscape(filename) contentDisposition := "inline" if r.FormValue("dl") != "" { if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl { @@ -254,10 +278,13 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) { rangeReq := r.Header.Get("Range") + bufferedWriter := bufio.NewWriterSize(w, 128*1024) + defer bufferedWriter.Flush() if rangeReq == "" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) - if err := writeFn(w, 0, totalSize); err != nil { + if err := writeFn(bufferedWriter, 0, totalSize); err != nil { + glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -268,6 +295,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 //mostly copy from src/pkg/net/http/fs.go ranges, err := parseRange(rangeReq, totalSize) if err != nil { + glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable) return } @@ -298,8 +326,9 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Range", ra.contentRange(totalSize)) w.WriteHeader(http.StatusPartialContent) - err = writeFn(w, ra.start, ra.length) + err = writeFn(bufferedWriter, ra.start, ra.length) if err != nil { + glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err) http.Error(w, err.Error(), http.StatusInternalServerError) return } @@ -338,7 +367,8 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64 w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10)) } w.WriteHeader(http.StatusPartialContent) - if _, err := io.CopyN(w, sendContent, sendSize); err != nil { + if _, err := io.CopyN(bufferedWriter, sendContent, sendSize); err != nil { + glog.Errorf("processRangeRequest err: %v", err) http.Error(w, "Internal Error", http.StatusInternalServerError) return } diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index e025e73dc..17d17c588 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -107,6 +107,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol locs = append(locs, &filer_pb.Location{ Url: loc.Url, PublicUrl: loc.PublicUrl, + GrpcPort: uint32(loc.GrpcPort), }) } resp.LocationsMap[vidString] = &filer_pb.Locations{ @@ -143,10 +144,15 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2) } + so, err := fs.detectStorageOption(string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "") + if err != nil { + return nil, err + } newEntry := filer.FromPbEntry(req.Directory, req.Entry) newEntry.Chunks = chunks + newEntry.TtlSec = so.TtlSeconds - createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures) + createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory) if createErr == nil { fs.filer.DeleteChunks(garbage) @@ -210,10 +216,11 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry if newEntry.Attributes != nil { so, _ := fs.detectStorageOption(fullpath, - newEntry.Attributes.Collection, - newEntry.Attributes.Replication, + "", + "", newEntry.Attributes.TtlSec, - newEntry.Attributes.DiskType, + "", + "", "", "", ) // ignore readonly error for capacity needed to manifestize @@ -257,7 +264,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo } entry.Chunks = append(entry.Chunks, req.Chunks...) - so, err := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "") + so, err := fs.detectStorageOption(string(fullpath), "", "", entry.TtlSec, "", "", "", "") if err != nil { glog.Warningf("detectStorageOption: %v", err) return &filer_pb.AppendToEntryResponse{}, err @@ -268,7 +275,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo glog.V(0).Infof("MaybeManifestize: %v", err) } - err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil) + err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false) return &filer_pb.AppendToEntryResponse{}, err } @@ -287,7 +294,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) { - so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack) + so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode) if err != nil { glog.V(3).Infof("AssignVolume: %v", err) return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil @@ -306,10 +313,13 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol } return &filer_pb.AssignVolumeResponse{ - FileId: assignResult.Fid, - Count: int32(assignResult.Count), - Url: assignResult.Url, - PublicUrl: assignResult.PublicUrl, + FileId: assignResult.Fid, + Count: int32(assignResult.Count), + Location: &filer_pb.Location{ + Url: assignResult.Url, + PublicUrl: assignResult.PublicUrl, + GrpcPort: uint32(assignResult.GrpcPort), + }, Auth: string(assignResult.Auth), Collection: so.Collection, Replication: so.Replication, @@ -321,7 +331,7 @@ func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.Collect glog.V(4).Infof("CollectionList %v", req) resp = &filer_pb.CollectionListResponse{} - err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{ IncludeNormalVolumes: req.IncludeNormalVolumes, IncludeEcVolumes: req.IncludeEcVolumes, @@ -342,7 +352,7 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet glog.V(4).Infof("DeleteCollection %v", req) - err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error { + err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { _, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{ Name: req.GetCollection(), }) @@ -351,128 +361,3 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet return &filer_pb.DeleteCollectionResponse{}, err } - -func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { - - var output *master_pb.StatisticsResponse - - err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error { - grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{ - Replication: req.Replication, - Collection: req.Collection, - Ttl: req.Ttl, - DiskType: req.DiskType, - }) - if grpcErr != nil { - return grpcErr - } - - output = grpcResponse - return nil - }) - - if err != nil { - return nil, err - } - - return &filer_pb.StatisticsResponse{ - TotalSize: output.TotalSize, - UsedSize: output.UsedSize, - FileCount: output.FileCount, - }, nil -} - -func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { - - clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) - - t := &filer_pb.GetFilerConfigurationResponse{ - Masters: fs.option.Masters, - Collection: fs.option.Collection, - Replication: fs.option.DefaultReplication, - MaxMb: uint32(fs.option.MaxMB), - DirBuckets: fs.filer.DirBucketsPath, - Cipher: fs.filer.Cipher, - Signature: fs.filer.Signature, - MetricsAddress: fs.metricsAddress, - MetricsIntervalSec: int32(fs.metricsIntervalSec), - Version: util.Version(), - ClusterId: string(clusterId), - } - - glog.V(4).Infof("GetFilerConfiguration: %v", t) - - return t, nil -} - -func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error { - - req, err := stream.Recv() - if err != nil { - return err - } - - clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort) - m := make(map[string]bool) - for _, tp := range req.Resources { - m[tp] = true - } - fs.brokersLock.Lock() - fs.brokers[clientName] = m - glog.V(0).Infof("+ broker %v", clientName) - fs.brokersLock.Unlock() - - defer func() { - fs.brokersLock.Lock() - delete(fs.brokers, clientName) - glog.V(0).Infof("- broker %v: %v", clientName, err) - fs.brokersLock.Unlock() - }() - - for { - if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil { - glog.V(0).Infof("send broker %v: %+v", clientName, err) - return err - } - // println("replied") - - if _, err := stream.Recv(); err != nil { - glog.V(0).Infof("recv broker %v: %v", clientName, err) - return err - } - // println("received") - } - -} - -func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) { - - resp = &filer_pb.LocateBrokerResponse{} - - fs.brokersLock.Lock() - defer fs.brokersLock.Unlock() - - var localBrokers []*filer_pb.LocateBrokerResponse_Resource - - for b, m := range fs.brokers { - if _, found := m[req.Resource]; found { - resp.Found = true - resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{ - { - GrpcAddresses: b, - ResourceCount: int32(len(m)), - }, - } - return - } - localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{ - GrpcAddresses: b, - ResourceCount: int32(len(m)), - }) - } - - resp.Resources = localBrokers - - return resp, nil - -} diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go new file mode 100644 index 000000000..df5b8fa1e --- /dev/null +++ b/weed/server/filer_grpc_server_admin.go @@ -0,0 +1,178 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/util" + "time" +) + +func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) { + + var output *master_pb.StatisticsResponse + + err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error { + grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{ + Replication: req.Replication, + Collection: req.Collection, + Ttl: req.Ttl, + DiskType: req.DiskType, + }) + if grpcErr != nil { + return grpcErr + } + + output = grpcResponse + return nil + }) + + if err != nil { + return nil, err + } + + return &filer_pb.StatisticsResponse{ + TotalSize: output.TotalSize, + UsedSize: output.UsedSize, + FileCount: output.FileCount, + }, nil +} + +func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (resp *filer_pb.PingResponse, pingErr error) { + resp = &filer_pb.PingResponse{ + StartTimeNs: time.Now().UnixNano(), + } + if req.TargetType == cluster.FilerType { + pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.VolumeServerType { + pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.MasterType { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if pingErr != nil { + pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr) + } + resp.StopTimeNs = time.Now().UnixNano() + return +} + +func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { + + clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) + + t := &filer_pb.GetFilerConfigurationResponse{ + Masters: pb.ToAddressStringsFromMap(fs.option.Masters), + Collection: fs.option.Collection, + Replication: fs.option.DefaultReplication, + MaxMb: uint32(fs.option.MaxMB), + DirBuckets: fs.filer.DirBucketsPath, + Cipher: fs.filer.Cipher, + Signature: fs.filer.Signature, + MetricsAddress: fs.metricsAddress, + MetricsIntervalSec: int32(fs.metricsIntervalSec), + Version: util.Version(), + ClusterId: string(clusterId), + FilerGroup: fs.option.FilerGroup, + } + + glog.V(4).Infof("GetFilerConfiguration: %v", t) + + return t, nil +} + +func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error { + + req, err := stream.Recv() + if err != nil { + return err + } + + clientName := util.JoinHostPort(req.Name, int(req.GrpcPort)) + m := make(map[string]bool) + for _, tp := range req.Resources { + m[tp] = true + } + fs.brokersLock.Lock() + fs.brokers[clientName] = m + glog.V(0).Infof("+ broker %v", clientName) + fs.brokersLock.Unlock() + + defer func() { + fs.brokersLock.Lock() + delete(fs.brokers, clientName) + glog.V(0).Infof("- broker %v: %v", clientName, err) + fs.brokersLock.Unlock() + }() + + for { + if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil { + glog.V(0).Infof("send broker %v: %+v", clientName, err) + return err + } + // println("replied") + + if _, err := stream.Recv(); err != nil { + glog.V(0).Infof("recv broker %v: %v", clientName, err) + return err + } + // println("received") + } + +} + +func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) { + + resp = &filer_pb.LocateBrokerResponse{} + + fs.brokersLock.Lock() + defer fs.brokersLock.Unlock() + + var localBrokers []*filer_pb.LocateBrokerResponse_Resource + + for b, m := range fs.brokers { + if _, found := m[req.Resource]; found { + resp.Found = true + resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{ + { + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }, + } + return + } + localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{ + GrpcAddresses: b, + ResourceCount: int32(len(m)), + }) + } + + resp.Resources = localBrokers + + return resp, nil + +} diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 8064431c5..3be986023 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -3,20 +3,22 @@ package weed_server import ( "context" "fmt" + "strings" + "sync" + "time" + "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" - "strings" - "sync" - "time" ) -func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.DownloadToLocalRequest) (*filer_pb.DownloadToLocalResponse, error) { +func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) { // load all mappings mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE)) @@ -56,14 +58,13 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo return nil, err } - resp := &filer_pb.DownloadToLocalResponse{} + resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{} if entry.Remote == nil || entry.Remote.RemoteSize == 0 { return resp, nil } // detect storage option - // replication level is set to "000" to ensure only need to ask one volume server to fetch the data. - so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "") + so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "") if err != nil { return resp, err } @@ -111,14 +112,26 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo return } + var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica + for _, r := range assignResult.Replicas { + replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{ + Url: r.Url, + PublicUrl: r.PublicUrl, + GrpcPort: int32(r.GrpcPort), + }) + } + // tell filer to tell volume server to download into needles - err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { + assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort) + err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ - VolumeId: uint32(fileId.VolumeId), - NeedleId: uint64(fileId.Key), - Cookie: uint32(fileId.Cookie), - Offset: localOffset, - Size: size, + VolumeId: uint32(fileId.VolumeId), + NeedleId: uint64(fileId.Key), + Cookie: uint32(fileId.Cookie), + Offset: localOffset, + Size: size, + Replicas: replicas, + Auth: string(assignResult.Auth), RemoteConf: storageConf, RemoteLocation: &remote_pb.RemoteStorageLocation{ Name: remoteStorageMountedLocation.Name, @@ -166,6 +179,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo // this skips meta data log events if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil { + fs.filer.DeleteChunks(chunks) return nil, err } fs.filer.DeleteChunks(garbage) diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go index 8a11c91e3..7d6650b53 100644 --- a/weed/server/filer_grpc_server_rename.go +++ b/weed/server/filer_grpc_server_rename.go @@ -4,6 +4,7 @@ import ( "context" "fmt" "path/filepath" + "time" "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/glog" @@ -33,7 +34,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) } - moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, req.Signatures) + moveErr := fs.moveEntry(ctx, nil, oldParent, oldEntry, newParent, req.NewName, req.Signatures) if moveErr != nil { fs.filer.RollbackTransaction(ctx) return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) @@ -47,11 +48,64 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom return &filer_pb.AtomicRenameEntryResponse{}, nil } -func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { +func (fs *FilerServer) StreamRenameEntry(req *filer_pb.StreamRenameEntryRequest, stream filer_pb.SeaweedFiler_StreamRenameEntryServer) (err error) { - if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error { + glog.V(1).Infof("StreamRenameEntry %v", req) + + oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory)) + newParent := util.FullPath(filepath.ToSlash(req.NewDirectory)) + + if err := fs.filer.CanRename(oldParent, newParent); err != nil { + return err + } + + ctx := context.Background() + + ctx, err = fs.filer.BeginTransaction(ctx) + if err != nil { + return err + } + + oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName)) + if err != nil { + fs.filer.RollbackTransaction(ctx) + return fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err) + } + + if oldEntry.IsDirectory() { + // follow https://pubs.opengroup.org/onlinepubs/000095399/functions/rename.html + targetDir := newParent.Child(req.NewName) + newEntry, err := fs.filer.FindEntry(ctx, targetDir) + if err == nil { + if !newEntry.IsDirectory() { + fs.filer.RollbackTransaction(ctx) + return fmt.Errorf("%s is not directory", targetDir) + } + if entries, _, _ := fs.filer.ListDirectoryEntries(context.Background(), targetDir, "", false, 1, "", "", ""); len(entries) > 0 { + return fmt.Errorf("%s is not empty", targetDir) + } + } + } + + moveErr := fs.moveEntry(ctx, stream, oldParent, oldEntry, newParent, req.NewName, req.Signatures) + if moveErr != nil { + fs.filer.RollbackTransaction(ctx) + return fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr) + } else { + if commitError := fs.filer.CommitTransaction(ctx); commitError != nil { + fs.filer.RollbackTransaction(ctx) + return fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError) + } + } + + return nil +} + +func (fs *FilerServer) moveEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { + + if err := fs.moveSelfEntry(ctx, stream, oldParent, entry, newParent, newName, func() error { if entry.IsDirectory() { - if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, signatures); err != nil { + if err := fs.moveFolderSubEntries(ctx, stream, oldParent, entry, newParent, newName, signatures); err != nil { return err } } @@ -63,7 +117,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e return nil } -func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { +func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error { currentDirPath := oldParent.Child(entry.Name()) newDirPath := newParent.Child(newName) @@ -84,7 +138,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. for _, item := range entries { lastFileName = item.Name() // println("processing", lastFileName) - err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), signatures) + err := fs.moveEntry(ctx, stream, currentDirPath, item, newDirPath, item.Name(), signatures) if err != nil { return err } @@ -96,7 +150,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util. return nil } -func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error { +func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error { oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName) @@ -109,15 +163,37 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat // add to new directory newEntry := &filer.Entry{ - FullPath: newPath, - Attr: entry.Attr, - Chunks: entry.Chunks, - Extended: entry.Extended, - Content: entry.Content, + FullPath: newPath, + Attr: entry.Attr, + Chunks: entry.Chunks, + Extended: entry.Extended, + Content: entry.Content, + HardLinkCounter: entry.HardLinkCounter, + HardLinkId: entry.HardLinkId, + Remote: entry.Remote, + Quota: entry.Quota, } - if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil { + if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures, false); createErr != nil { return createErr } + if stream != nil { + if err := stream.Send(&filer_pb.StreamRenameEntryResponse{ + Directory: string(oldParent), + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: entry.Name(), + }, + NewEntry: newEntry.ToProtoEntry(), + DeleteChunks: false, + NewParentPath: string(newParent), + IsFromOtherCluster: false, + Signatures: nil, + }, + TsNs: time.Now().UnixNano(), + }); err != nil { + return err + } + } if moveFolderSubEntries != nil { if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil { @@ -130,6 +206,24 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat if deleteErr != nil { return deleteErr } + if stream != nil { + if err := stream.Send(&filer_pb.StreamRenameEntryResponse{ + Directory: string(oldParent), + EventNotification: &filer_pb.EventNotification{ + OldEntry: &filer_pb.Entry{ + Name: entry.Name(), + }, + NewEntry: nil, + DeleteChunks: false, + NewParentPath: "", + IsFromOtherCluster: false, + Signatures: nil, + }, + TsNs: time.Now().UnixNano(), + }); err != nil { + return err + } + } return nil diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 3fdac1b26..da710234b 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -2,6 +2,7 @@ package weed_server import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" "strings" "time" @@ -23,9 +24,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, peerAddress := findClientAddress(stream.Context(), 0) - clientName := fs.addClient(req.ClientName, peerAddress) - - defer fs.deleteClient(clientName) + alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId) + if alreadyKnown { + return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId) + } + defer fs.deleteClient(clientName, req.ClientId) lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -37,28 +40,27 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, var processedTsNs int64 var readPersistedLogErr error var readInMemoryLogErr error + var isDone bool for { glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) if readPersistedLogErr != nil { return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } + if isDone { + return nil + } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) - } else { - if readInMemoryLogErr == log_buffer.ResumeFromDiskError { - time.Sleep(1127 * time.Millisecond) - continue - } } glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { fs.filer.MetaAggregator.ListenersLock.Lock() fs.filer.MetaAggregator.ListenersCond.Wait() fs.filer.MetaAggregator.ListenersLock.Unlock() @@ -73,6 +75,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest, break } } + if isDone { + return nil + } time.Sleep(1127 * time.Millisecond) } @@ -85,9 +90,11 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq peerAddress := findClientAddress(stream.Context(), 0) - clientName := fs.addClient(req.ClientName, peerAddress) - - defer fs.deleteClient(clientName) + alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId) + if alreadyKnown { + return fmt.Errorf("duplicated local subscription detected for client %s id %d", clientName, req.ClientId) + } + defer fs.deleteClient(clientName, req.ClientId) lastReadTime := time.Unix(0, req.SinceNs) glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) @@ -99,14 +106,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq var processedTsNs int64 var readPersistedLogErr error var readInMemoryLogErr error + var isDone bool for { // println("reading from persisted logs ...") glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn) + processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn) if readPersistedLogErr != nil { + glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr) return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr) } + if isDone { + return nil + } if processedTsNs != 0 { lastReadTime = time.Unix(0, processedTsNs) @@ -119,7 +131,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime) - lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool { + lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool { fs.listenersLock.Lock() fs.listenersCond.Wait() fs.listenersLock.Unlock() @@ -130,11 +142,13 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq continue } glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr) - time.Sleep(1127 * time.Millisecond) if readInMemoryLogErr != log_buffer.ResumeError { break } } + if isDone { + return nil + } } return readInMemoryLogErr @@ -201,17 +215,24 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe return nil } - if !strings.HasPrefix(fullpath, req.PathPrefix) { - if eventNotification.NewParentPath != "" { - newFullPath := util.Join(eventNotification.NewParentPath, entryName) - if !strings.HasPrefix(newFullPath, req.PathPrefix) { + if hasPrefixIn(fullpath, req.PathPrefixes) { + // good + } else { + if !strings.HasPrefix(fullpath, req.PathPrefix) { + if eventNotification.NewParentPath != "" { + newFullPath := util.Join(eventNotification.NewParentPath, entryName) + if !strings.HasPrefix(newFullPath, req.PathPrefix) { + return nil + } + } else { return nil } - } else { - return nil } } + // collect timestamps for path + stats.FilerServerLastSendTsOfSubscribeGauge.WithLabelValues(fs.option.Host.String(), req.ClientName, req.PathPrefix).Set(float64(tsNs)) + message := &filer_pb.SubscribeMetadataResponse{ Directory: dirPath, EventNotification: eventNotification, @@ -227,12 +248,31 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe } } -func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) { +func hasPrefixIn(text string, prefixes []string) bool { + for _, p := range prefixes { + if strings.HasPrefix(text, p) { + return true + } + } + return false +} + +func (fs *FilerServer) addClient(clientType string, clientAddress string, clientId int32) (alreadyKnown bool, clientName string) { clientName = clientType + "@" + clientAddress glog.V(0).Infof("+ listener %v", clientName) + if clientId != 0 { + fs.knownListenersLock.Lock() + _, alreadyKnown = fs.knownListeners[clientId] + fs.knownListenersLock.Unlock() + } return } -func (fs *FilerServer) deleteClient(clientName string) { +func (fs *FilerServer) deleteClient(clientName string, clientId int32) { glog.V(0).Infof("- listener %v", clientName) + if clientId != 0 { + fs.knownListenersLock.Lock() + delete(fs.knownListeners, clientId) + fs.knownListenersLock.Unlock() + } } diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 534bc4840..6bf0261ee 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -16,10 +16,12 @@ import ( "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "github.com/chrislusf/seaweedfs/weed/util" "github.com/chrislusf/seaweedfs/weed/filer" + _ "github.com/chrislusf/seaweedfs/weed/filer/arangodb" _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra" _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7" _ "github.com/chrislusf/seaweedfs/weed/filer/etcd" @@ -34,7 +36,9 @@ import ( _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2" _ "github.com/chrislusf/seaweedfs/weed/filer/redis" _ "github.com/chrislusf/seaweedfs/weed/filer/redis2" + _ "github.com/chrislusf/seaweedfs/weed/filer/redis3" _ "github.com/chrislusf/seaweedfs/weed/filer/sqlite" + _ "github.com/chrislusf/seaweedfs/weed/filer/ydb" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/notification" _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs" @@ -46,7 +50,8 @@ import ( ) type FilerOption struct { - Masters []string + Masters map[string]pb.ServerAddress + FilerGroup string Collection string DefaultReplication string DisableDirListing bool @@ -54,21 +59,23 @@ type FilerOption struct { DirListingLimit int DataCenter string Rack string + DataNode string DefaultLevelDbDir string DisableHttp bool - Host string - Port uint32 + Host pb.ServerAddress recursiveDelete bool Cipher bool SaveToFilerLimit int64 - Filers []string ConcurrentUploadLimit int64 + ShowUIDirectoryDelete bool } type FilerServer struct { + filer_pb.UnimplementedSeaweedFilerServer option *FilerOption secret security.SigningKey filer *filer.Filer + filerGuard *security.Guard grpcDialOption grpc.DialOption // metrics read from the master @@ -79,6 +86,10 @@ type FilerServer struct { listenersLock sync.Mutex listenersCond *sync.Cond + // track known metadata listeners + knownListenersLock sync.Mutex + knownListeners map[int32]struct{} + brokers map[string]map[string]bool brokersLock sync.Mutex @@ -88,9 +99,19 @@ type FilerServer struct { func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) { + v := util.GetViper() + signingKey := v.GetString("jwt.filer_signing.key") + v.SetDefault("jwt.filer_signing.expires_after_seconds", 10) + expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds") + + readSigningKey := v.GetString("jwt.filer_signing.read.key") + v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60) + readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds") + fs = &FilerServer{ option: option, grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"), + knownListeners: make(map[int32]struct{}), brokers: make(map[string]map[string]bool), inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)), } @@ -100,20 +121,21 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) glog.Fatal("master list is required!") } - fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() { + fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() { fs.listenersCond.Broadcast() }) fs.filer.Cipher = option.Cipher + // we do not support IP whitelist right now + fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) fs.checkWithMaster() - go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec) - go fs.filer.KeepConnectedToMaster() + go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec) + go fs.filer.KeepMasterClientConnected() - v := util.GetViper() if !util.LoadConfiguration("filer", false) { - v.Set("leveldb2.enabled", true) - v.Set("leveldb2.dir", option.DefaultLevelDbDir) + v.SetDefault("leveldb2.enabled", true) + v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir) _, err := os.Stat(option.DefaultLevelDbDir) if os.IsNotExist(err) { os.MkdirAll(option.DefaultLevelDbDir, 0755) @@ -130,7 +152,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) // TODO deprecated, will be be removed after 2020-12-31 // replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration // fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync") - fs.filer.LoadConfiguration(v) + isFresh := fs.filer.LoadConfiguration(v) notification.LoadConfiguration(v, "notification.") @@ -143,9 +165,15 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers) - - fs.filer.LoadBuckets() + existingNodes := fs.filer.ListExistingPeerUpdates() + startFromTime := time.Now().Add(-filer.LogFlushInterval) + if isFresh { + glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes) + if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil { + glog.Fatalf("%s bootstrap from %+v", option.Host, existingNodes) + } + } + fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime) fs.filer.LoadFilerConf() @@ -160,17 +188,10 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) func (fs *FilerServer) checkWithMaster() { - for _, master := range fs.option.Masters { - _, err := pb.ParseServerToGrpcAddress(master) - if err != nil { - glog.Fatalf("invalid master address %s: %v", master, err) - } - } - isConnected := false for !isConnected { for _, master := range fs.option.Masters { - readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go index 118646a04..6f0d0b7ca 100644 --- a/weed/server/filer_server_handlers.go +++ b/weed/server/filer_server_handlers.go @@ -1,7 +1,9 @@ package weed_server import ( + "errors" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/util" "net/http" "strings" @@ -15,6 +17,19 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { start := time.Now() + if r.Method == "OPTIONS" { + stats.FilerRequestCounter.WithLabelValues("options").Inc() + OptionsHandler(w, r, false) + stats.FilerRequestHistogram.WithLabelValues("options").Observe(time.Since(start).Seconds()) + return + } + + isReadHttpCall := r.Method == "GET" || r.Method == "HEAD" + if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) { + writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) + return + } + // proxy to volume servers var fileId string if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") { @@ -78,20 +93,31 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) { fs.PostHandler(w, r, contentLength) stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds()) } - case "OPTIONS": - stats.FilerRequestCounter.WithLabelValues("options").Inc() - OptionsHandler(w, r, false) - stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) } } func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) { + + start := time.Now() + + // We handle OPTIONS first because it never should be authenticated + if r.Method == "OPTIONS" { + stats.FilerRequestCounter.WithLabelValues("options").Inc() + OptionsHandler(w, r, true) + stats.FilerRequestHistogram.WithLabelValues("options").Observe(time.Since(start).Seconds()) + return + } + + if !fs.maybeCheckJwtAuthorization(r, false) { + writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt")) + return + } + w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION) if r.Header.Get("Origin") != "" { w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Credentials", "true") } - start := time.Now() switch r.Method { case "GET": stats.FilerRequestCounter.WithLabelValues("get").Inc() @@ -101,10 +127,6 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque stats.FilerRequestCounter.WithLabelValues("head").Inc() fs.GetOrHeadHandler(w, r) stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) - case "OPTIONS": - stats.FilerRequestCounter.WithLabelValues("options").Inc() - OptionsHandler(w, r, true) - stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds()) } } @@ -116,3 +138,41 @@ func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) { } w.Header().Add("Access-Control-Allow-Headers", "*") } + +// maybeCheckJwtAuthorization returns true if access should be granted, false if it should be denied +func (fs *FilerServer) maybeCheckJwtAuthorization(r *http.Request, isWrite bool) bool { + + var signingKey security.SigningKey + + if isWrite { + if len(fs.filerGuard.SigningKey) == 0 { + return true + } else { + signingKey = fs.filerGuard.SigningKey + } + } else { + if len(fs.filerGuard.ReadSigningKey) == 0 { + return true + } else { + signingKey = fs.filerGuard.ReadSigningKey + } + } + + tokenStr := security.GetJwt(r) + if tokenStr == "" { + glog.V(1).Infof("missing jwt from %s", r.RemoteAddr) + return false + } + + token, err := security.DecodeJwt(signingKey, tokenStr, &security.SeaweedFilerClaims{}) + if err != nil { + glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err) + return false + } + if !token.Valid { + glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr) + return false + } else { + return true + } +} diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go index b8b28790b..301d609ec 100644 --- a/weed/server/filer_server_handlers_proxy.go +++ b/weed/server/filer_server_handlers_proxy.go @@ -3,6 +3,7 @@ package weed_server import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" "math/rand" "net/http" @@ -62,6 +63,9 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques w.Header()[k] = v } w.WriteHeader(proxyResponse.StatusCode) - io.Copy(w, proxyResponse.Body) + + buf := mem.Allocate(128 * 1024) + defer mem.Free(buf) + io.CopyBuffer(w, proxyResponse.Body, buf) } diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 054a1bd00..28573f7b3 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -4,10 +4,12 @@ import ( "bytes" "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" + "math" "mime" "net/http" - "net/url" "path/filepath" "strconv" "strings" @@ -17,11 +19,67 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/images" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" ) +// Validates the preconditions. Returns true if GET/HEAD operation should not proceed. +// Preconditions supported are: +// If-Modified-Since +// If-Unmodified-Since +// If-Match +// If-None-Match +func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Entry) bool { + + etag := filer.ETagEntry(entry) + /// When more than one conditional request header field is present in a + /// request, the order in which the fields are evaluated becomes + /// important. In practice, the fields defined in this document are + /// consistently implemented in a single, logical order, since "lost + /// update" preconditions have more strict requirements than cache + /// validation, a validated cache is more efficient than a partial + /// response, and entity tags are presumed to be more accurate than date + /// validators. https://tools.ietf.org/html/rfc7232#section-5 + if entry.Attr.Mtime.IsZero() { + return false + } + w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat)) + + ifMatchETagHeader := r.Header.Get("If-Match") + ifUnmodifiedSinceHeader := r.Header.Get("If-Unmodified-Since") + if ifMatchETagHeader != "" { + if util.CanonicalizeETag(etag) != util.CanonicalizeETag(ifMatchETagHeader) { + w.WriteHeader(http.StatusPreconditionFailed) + return true + } + } else if ifUnmodifiedSinceHeader != "" { + if t, parseError := time.Parse(http.TimeFormat, ifUnmodifiedSinceHeader); parseError == nil { + if t.Before(entry.Attr.Mtime) { + w.WriteHeader(http.StatusPreconditionFailed) + return true + } + } + } + + ifNoneMatchETagHeader := r.Header.Get("If-None-Match") + ifModifiedSinceHeader := r.Header.Get("If-Modified-Since") + if ifNoneMatchETagHeader != "" { + if util.CanonicalizeETag(etag) == util.CanonicalizeETag(ifNoneMatchETagHeader) { + w.WriteHeader(http.StatusNotModified) + return true + } + } else if ifModifiedSinceHeader != "" { + if t, parseError := time.Parse(http.TimeFormat, ifModifiedSinceHeader); parseError == nil { + if t.After(entry.Attr.Mtime) { + w.WriteHeader(http.StatusNotModified) + return true + } + } + } + + return false +} + func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { path := r.URL.Path @@ -38,11 +96,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } if err == filer_pb.ErrNotFound { glog.V(1).Infof("Not found %s: %v", path, err) - stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc() + stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadNotFound).Inc() w.WriteHeader(http.StatusNotFound) } else { glog.Errorf("Internal %s: %v", path, err) - stats.FilerRequestCounter.WithLabelValues("read.internalerror").Inc() + stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadInternal).Inc() w.WriteHeader(http.StatusInternalServerError) } return @@ -62,10 +120,22 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) return } - // set etag + query := r.URL.Query() + if query.Get("metadata") == "true" { + if query.Get("resolveManifest") == "true" { + if entry.Chunks, _, err = filer.ResolveChunkManifest( + fs.filer.MasterClient.GetLookupFileIdFunction(), + entry.Chunks, 0, math.MaxInt64); err != nil { + err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error()) + writeJsonError(w, r, http.StatusInternalServerError, err) + } + } + writeJsonQuiet(w, r, http.StatusOK, entry) + return + } + etag := filer.ETagEntry(entry) - if ifm := r.Header.Get("If-Match"); ifm != "" && ifm != "\""+etag+"\"" { - w.WriteHeader(http.StatusPreconditionFailed) + if checkPreconditions(w, r, entry) { return } @@ -82,22 +152,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.Header().Set("Content-Type", mimeType) } - // if modified since - if !entry.Attr.Mtime.IsZero() { - w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat)) - if r.Header.Get("If-Modified-Since") != "" { - if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil { - if !t.Before(entry.Attr.Mtime) { - w.WriteHeader(http.StatusNotModified) - return - } - } - } - } - // print out the header from extended properties for k, v := range entry.Extended { - w.Header().Set(k, string(v)) + if !strings.HasPrefix(k, "xattr-") { + // "xattr-" prefix is set in filesys.XATTR_PREFIX + w.Header().Set(k, string(v)) + } } //Seaweed custom header are not visible to Vue or javascript @@ -111,27 +171,20 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) w.Header().Set("Access-Control-Expose-Headers", strings.Join(seaweedHeaders, ",")) //set tag count - if r.Method == "GET" { - tagCount := 0 - for k := range entry.Extended { - if strings.HasPrefix(k, xhttp.AmzObjectTagging+"-") { - tagCount++ - } - } - if tagCount > 0 { - w.Header().Set(xhttp.AmzTagCount, strconv.Itoa(tagCount)) + tagCount := 0 + for k := range entry.Extended { + if strings.HasPrefix(k, s3_constants.AmzObjectTagging+"-") { + tagCount++ } } - - if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" { - w.WriteHeader(http.StatusNotModified) - return + if tagCount > 0 { + w.Header().Set(s3_constants.AmzTagCount, strconv.Itoa(tagCount)) } + setEtag(w, etag) filename := entry.Name() - filename = url.QueryEscape(filename) - adjustHeaderContentDisposition(w, r, filename) + adjustPassthroughHeaders(w, r, filename) totalSize := int64(entry.Size()) @@ -147,10 +200,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } width, height, mode, shouldResize := shouldResizeImages(ext, r) if shouldResize { - data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks) + data := mem.Allocate(int(totalSize)) + defer mem.Free(data) + err := filer.ReadAll(data, fs.filer.MasterClient, entry.Chunks) if err != nil { glog.Errorf("failed to read %s: %v", path, err) - w.WriteHeader(http.StatusNotModified) + w.WriteHeader(http.StatusInternalServerError) return } rs, _, _ := images.Resized(ext, bytes.NewReader(data), width, height, mode) @@ -163,6 +218,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if offset+size <= int64(len(entry.Content)) { _, err := writer.Write(entry.Content[offset : offset+size]) if err != nil { + stats.FilerRequestCounter.WithLabelValues(stats.ErrorWriteEntry).Inc() glog.Errorf("failed to write entry content: %v", err) } return err @@ -170,10 +226,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) chunks := entry.Chunks if entry.IsInRemoteOnly() { dir, name := entry.FullPath.DirAndName() - if resp, err := fs.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{ + if resp, err := fs.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{ Directory: dir, Name: name, }); err != nil { + stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadCache).Inc() + glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err) return fmt.Errorf("cache %s: %v", entry.FullPath, err) } else { chunks = resp.Entry.Chunks @@ -182,6 +240,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) err = filer.StreamContent(fs.filer.MasterClient, writer, chunks, offset, size) if err != nil { + stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc() glog.Errorf("failed to stream content %s: %v", r.URL, err) } return err diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 307c411b6..eaf17fa18 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -2,9 +2,6 @@ package weed_server import ( "context" - "encoding/base64" - "fmt" - "github.com/skip2/go-qrcode" "net/http" "strconv" "strings" @@ -49,8 +46,10 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque path = "" } + emptyFolder := true if len(entries) > 0 { lastFileName = entries[len(entries)-1].Name() + emptyFolder = false } glog.V(4).Infof("listDirectory %s, last file %s, limit %d: %d items", path, lastFileName, limit, len(entries)) @@ -62,30 +61,27 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque Limit int LastFileName string ShouldDisplayLoadMore bool + EmptyFolder bool }{ path, entries, limit, lastFileName, shouldDisplayLoadMore, + emptyFolder, }) return } - var qrImageString string - img, err := qrcode.Encode(fmt.Sprintf("http://%s:%d%s", fs.option.Host, fs.option.Port, r.URL.Path), qrcode.Medium, 128) - if err == nil { - qrImageString = base64.StdEncoding.EncodeToString(img) - } - - ui.StatusTpl.Execute(w, struct { + err = ui.StatusTpl.Execute(w, struct { Path string Breadcrumbs []ui.Breadcrumb Entries interface{} Limit int LastFileName string ShouldDisplayLoadMore bool - QrImage string + EmptyFolder bool + ShowDirectoryDelete bool }{ path, ui.ToBreadcrumb(path), @@ -93,6 +89,10 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque limit, lastFileName, shouldDisplayLoadMore, - qrImageString, + emptyFolder, + fs.option.ShowUIDirectoryDelete, }) + if err != nil { + glog.V(0).Infof("Template Execute Error: %v", err) + } } diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go index 70b5327d6..ae2093947 100644 --- a/weed/server/filer_server_handlers_tagging.go +++ b/weed/server/filer_server_handlers_tagging.go @@ -43,7 +43,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) } } - if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false); dbErr != nil { glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr) writeJsonError(w, r, http.StatusInternalServerError, err) return @@ -82,7 +82,9 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque toDelete := strings.Split(r.URL.Query().Get("tagging"), ",") deletions := make(map[string]struct{}) for _, deletion := range toDelete { - deletions[deletion] = struct{}{} + if deletion != "" { + deletions[deletion] = struct{}{} + } } // delete all tags or specific tags @@ -107,7 +109,7 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque return } - if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false); dbErr != nil { glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr) writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 39d983ab7..bbaf28aa8 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -3,6 +3,8 @@ package weed_server import ( "context" "errors" + "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "net/http" "os" "strings" @@ -57,14 +59,21 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte ctx := context.Background() + destination := r.RequestURI + if finalDestination := r.Header.Get(s3_constants.SeaweedStorageDestinationHeader); finalDestination != "" { + destination = finalDestination + } + query := r.URL.Query() - so, err := fs.detectStorageOption0(r.RequestURI, + so, err := fs.detectStorageOption0(destination, query.Get("collection"), query.Get("replication"), query.Get("ttl"), query.Get("disk"), + query.Get("fsync"), query.Get("dataCenter"), query.Get("rack"), + query.Get("dataNode"), ) if err != nil { if err == ErrReadOnly { @@ -76,11 +85,78 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte return } - fs.autoChunk(ctx, w, r, contentLength, so) + if query.Has("mv.from") { + fs.move(ctx, w, r, so) + } else { + fs.autoChunk(ctx, w, r, contentLength, so) + } + util.CloseRequest(r) } +func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) { + src := r.URL.Query().Get("mv.from") + dst := r.URL.Path + + glog.V(2).Infof("FilerServer.move %v to %v", src, dst) + + var err error + if src, err = clearName(src); err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + if dst, err = clearName(dst); err != nil { + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + src = strings.TrimRight(src, "/") + if src == "" { + err = fmt.Errorf("invalid source '/'") + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + srcPath := util.FullPath(src) + dstPath := util.FullPath(dst) + srcEntry, err := fs.filer.FindEntry(ctx, srcPath) + if err != nil { + err = fmt.Errorf("failed to get src entry '%s', err: %s", src, err) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + oldDir, oldName := srcPath.DirAndName() + newDir, newName := dstPath.DirAndName() + newName = util.Nvl(newName, oldName) + + dstEntry, err := fs.filer.FindEntry(ctx, util.FullPath(strings.TrimRight(dst, "/"))) + if err != nil && err != filer_pb.ErrNotFound { + err = fmt.Errorf("failed to get dst entry '%s', err: %s", dst, err) + writeJsonError(w, r, http.StatusInternalServerError, err) + return + } + if err == nil && !dstEntry.IsDirectory() && srcEntry.IsDirectory() { + err = fmt.Errorf("move: cannot overwrite non-directory '%s' with directory '%s'", dst, src) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + _, err = fs.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: oldDir, + OldName: oldName, + NewDirectory: newDir, + NewName: newName, + }) + if err != nil { + err = fmt.Errorf("failed to move entry from '%s' to '%s', err: %s", src, dst, err) + writeJsonError(w, r, http.StatusBadRequest, err) + return + } + + w.WriteHeader(http.StatusNoContent) +} + // curl -X DELETE http://localhost:8888/path/to // curl -X DELETE http://localhost:8888/path/to?recursive=true // curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true @@ -115,7 +191,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } -func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack string) (*operation.StorageOption, error) { +func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) { rule := fs.filer.FilerConf.MatchStorageRule(requestURI) @@ -124,10 +200,9 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication } // required by buckets folder - bucketDefaultCollection, bucketDefaultReplication, fsync := "", "", false + bucketDefaultCollection := "" if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") { bucketDefaultCollection = fs.filer.DetectBucket(util.FullPath(requestURI)) - bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(bucketDefaultCollection) } if ttlSeconds == 0 { @@ -139,23 +214,33 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication } return &operation.StorageOption{ - Replication: util.Nvl(qReplication, rule.Replication, bucketDefaultReplication, fs.option.DefaultReplication), + Replication: util.Nvl(qReplication, rule.Replication, fs.option.DefaultReplication), Collection: util.Nvl(qCollection, rule.Collection, bucketDefaultCollection, fs.option.Collection), - DataCenter: util.Nvl(dataCenter, fs.option.DataCenter), - Rack: util.Nvl(rack, fs.option.Rack), + DataCenter: util.Nvl(dataCenter, rule.DataCenter, fs.option.DataCenter), + Rack: util.Nvl(rack, rule.Rack, fs.option.Rack), + DataNode: util.Nvl(dataNode, rule.DataNode, fs.option.DataNode), TtlSeconds: ttlSeconds, DiskType: util.Nvl(diskType, rule.DiskType), - Fsync: fsync || rule.Fsync, + Fsync: rule.Fsync, VolumeGrowthCount: rule.VolumeGrowthCount, }, nil } -func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) (*operation.StorageOption, error) { +func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode string) (*operation.StorageOption, error) { ttl, err := needle.ReadTTL(qTtl) if err != nil { glog.Errorf("fail to parse ttl %s: %v", qTtl, err) } - return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack) + so, err := fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode) + if so != nil { + if fsync == "false" { + so.Fsync = false + } else if fsync == "true" { + so.Fsync = true + } + } + + return so, err } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index a42e0fc97..9c2b9959f 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "io" "net/http" "os" @@ -15,7 +16,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" @@ -62,7 +62,8 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r * } } else if reply != nil { if len(md5bytes) > 0 { - w.Header().Set("Content-MD5", util.Base64Encode(md5bytes)) + md5InBase64 := util.Base64Encode(md5bytes) + w.Header().Set("Content-MD5", md5InBase64) } writeJsonQuiet(w, r, http.StatusCreated, reply) } @@ -96,6 +97,9 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite md5bytes = md5Hash.Sum(nil) filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) + if replyerr != nil { + fs.filer.DeleteChunks(fileChunks) + } return } @@ -115,6 +119,9 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter md5bytes = md5Hash.Sum(nil) filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent) + if replyerr != nil { + fs.filer.DeleteChunks(fileChunks) + } return } @@ -123,6 +130,10 @@ func isAppend(r *http.Request) bool { return r.URL.Query().Get("op") == "append" } +func skipCheckParentDirEntry(r *http.Request) bool { + return r.URL.Query().Get("skipCheckParentDir") == "true" +} + func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) { // detect file mode @@ -153,9 +164,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } var entry *filer.Entry + var newChunks []*filer_pb.FileChunk var mergedChunks []*filer_pb.FileChunk + + isAppend := isAppend(r) + isOffsetWrite := len(fileChunks) > 0 && fileChunks[0].Offset > 0 // when it is an append - if isAppend(r) { + if isAppend || isOffsetWrite { existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path)) if findErr != nil && findErr != filer_pb.ErrNotFound { glog.V(0).Infof("failing to find %s: %v", path, findErr) @@ -166,11 +181,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa entry.Mtime = time.Now() entry.Md5 = nil // adjust chunk offsets - for _, chunk := range fileChunks { - chunk.Offset += int64(entry.FileSize) + if isAppend { + for _, chunk := range fileChunks { + chunk.Offset += int64(entry.FileSize) + } + entry.FileSize += uint64(chunkOffset) } - mergedChunks = append(entry.Chunks, fileChunks...) - entry.FileSize += uint64(chunkOffset) + newChunks = append(entry.Chunks, fileChunks...) // TODO if len(entry.Content) > 0 { @@ -180,27 +197,31 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa } else { glog.V(4).Infoln("saving", path) - mergedChunks = fileChunks + newChunks = fileChunks entry = &filer.Entry{ FullPath: util.FullPath(path), Attr: filer.Attr{ - Mtime: time.Now(), - Crtime: time.Now(), - Mode: os.FileMode(mode), - Uid: OS_UID, - Gid: OS_GID, - Replication: so.Replication, - Collection: so.Collection, - TtlSec: so.TtlSeconds, - DiskType: so.DiskType, - Mime: contentType, - Md5: md5bytes, - FileSize: uint64(chunkOffset), + Mtime: time.Now(), + Crtime: time.Now(), + Mode: os.FileMode(mode), + Uid: OS_UID, + Gid: OS_GID, + TtlSec: so.TtlSeconds, + Mime: contentType, + Md5: md5bytes, + FileSize: uint64(chunkOffset), }, Content: content, } } + // maybe concatenate small chunks into one whole chunk + mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks) + if replyerr != nil { + glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr) + mergedChunks = newChunks + } + // maybe compact entry chunks mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks) if replyerr != nil { @@ -208,6 +229,10 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa return } entry.Chunks = mergedChunks + if isOffsetWrite { + entry.Md5 = nil + entry.FileSize = entry.Size() + } filerResult = &FilerPostResult{ Name: fileName, @@ -217,13 +242,17 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa entry.Extended = SaveAmzMetaData(r, entry.Extended, false) for k, v := range r.Header { - if len(v) > 0 && (strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires") { - entry.Extended[k] = []byte(v[0]) + if len(v) > 0 && len(v[0]) > 0 { + if strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" { + entry.Extended[k] = []byte(v[0]) + } + if k == "Response-Content-Disposition" { + entry.Extended["Content-Disposition"] = []byte(v[0]) + } } } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { - fs.filer.DeleteChunks(fileChunks) + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, skipCheckParentDirEntry(r)); dbErr != nil { replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr) @@ -241,7 +270,16 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs } // upload the chunk to the volume server - uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: urlLocation, + Filename: name, + Cipher: fs.option.Cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: auth, + } + uploadResult, uploadErr, _ := operation.Upload(reader, uploadOption) if uploadErr != nil { return nil, "", "", uploadErr } @@ -291,7 +329,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http Name: util.FullPath(path).Name(), } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false); dbErr != nil { replyerr = dbErr filerResult.Error = dbErr.Error() glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr) @@ -308,21 +346,23 @@ func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool } } - if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" { - metadata[xhttp.AmzStorageClass] = []byte(sc) + if sc := r.Header.Get(s3_constants.AmzStorageClass); sc != "" { + metadata[s3_constants.AmzStorageClass] = []byte(sc) } - if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" { + if tags := r.Header.Get(s3_constants.AmzObjectTagging); tags != "" { for _, v := range strings.Split(tags, "&") { tag := strings.Split(v, "=") if len(tag) == 2 { - metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1]) + metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1]) + } else if len(tag) == 1 { + metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = nil } } } for header, values := range r.Header { - if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) { + if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) { for _, value := range values { metadata[header] = []byte(value) } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index acaa8f5ab..1f10d044e 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -44,7 +44,16 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht // println("detect2 mimetype to", pu.MimeType) } - uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: urlLocation, + Filename: pu.FileName, + Cipher: true, + IsInputCompressed: false, + MimeType: pu.MimeType, + PairMap: pu.PairMap, + Jwt: auth, + } + uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption) if uploadError != nil { return nil, fmt.Errorf("upload to volume server: %v", uploadError) } @@ -64,17 +73,14 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht entry := &filer.Entry{ FullPath: util.FullPath(path), Attr: filer.Attr{ - Mtime: time.Now(), - Crtime: time.Now(), - Mode: 0660, - Uid: OS_UID, - Gid: OS_GID, - Replication: so.Replication, - Collection: so.Collection, - TtlSec: so.TtlSeconds, - DiskType: so.DiskType, - Mime: pu.MimeType, - Md5: util.Base64Md5ToBytes(pu.ContentMd5), + Mtime: time.Now(), + Crtime: time.Now(), + Mode: 0660, + Uid: OS_UID, + Gid: OS_GID, + TtlSec: so.TtlSeconds, + Mime: pu.MimeType, + Md5: util.Base64Md5ToBytes(pu.ContentMd5), }, Chunks: fileChunks, } @@ -84,7 +90,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht Size: int64(pu.OriginalDataSize), } - if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil { + if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false); dbErr != nil { fs.filer.DeleteChunks(entry.Chunks) err = dbErr filerResult.Error = dbErr.Error() diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go new file mode 100644 index 000000000..dadc6f726 --- /dev/null +++ b/weed/server/filer_server_handlers_write_merge.go @@ -0,0 +1,11 @@ +package weed_server + +import ( + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) { + //TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks + return inputChunks, nil +} diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 2275ff1bc..fe3346402 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -3,11 +3,12 @@ package weed_server import ( "bytes" "crypto/md5" + "fmt" + "golang.org/x/exp/slices" "hash" "io" - "io/ioutil" "net/http" - "sort" + "strconv" "strings" "sync" "sync/atomic" @@ -29,9 +30,25 @@ var bufPool = sync.Pool{ } func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) { + query := r.URL.Query() + + isAppend := isAppend(r) + if query.Has("offset") { + offset := query.Get("offset") + offsetInt, err := strconv.ParseInt(offset, 10, 64) + if err != nil || offsetInt < 0 { + err = fmt.Errorf("invalid 'offset': '%s'", offset) + return nil, nil, 0, err, nil + } + if isAppend && offsetInt > 0 { + err = fmt.Errorf("cannot set offset when op=append") + return nil, nil, 0, err, nil + } + chunkOffset = offsetInt + } md5Hash = md5.New() - var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash)) + var partReader = io.NopCloser(io.TeeReader(reader, md5Hash)) var wg sync.WaitGroup var bytesBufferCounter int64 @@ -57,14 +74,15 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque dataSize, err := bytesBuffer.ReadFrom(limitedReader) - // data, err := ioutil.ReadAll(limitedReader) + // data, err := io.ReadAll(limitedReader) if err != nil || dataSize == 0 { bufPool.Put(bytesBuffer) atomic.AddInt64(&bytesBufferCounter, -1) bytesBufferLimitCond.Signal() + uploadErr = err break } - if chunkOffset == 0 && !isAppend(r) { + if chunkOffset == 0 && !isAppend { if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) { chunkOffset += dataSize smallContent = make([]byte, dataSize) @@ -109,13 +127,12 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque wg.Wait() if uploadErr != nil { + fs.filer.DeleteChunks(fileChunks) return nil, md5Hash, 0, uploadErr, nil } - - sort.Slice(fileChunks, func(i, j int) bool { - return fileChunks[i].Offset < fileChunks[j].Offset + slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool { + return a.Offset < b.Offset }) - return fileChunks, md5Hash, chunkOffset, nil, smallContent } @@ -127,7 +144,16 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds()) }() - uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: urlLocation, + Filename: fileName, + Cipher: fs.option.Cipher, + IsInputCompressed: false, + MimeType: contentType, + PairMap: pairMap, + Jwt: auth, + } + uploadResult, err, data := operation.Upload(limitedReader, uploadOption) if uploadResult != nil && uploadResult.RetryCount > 0 { stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount)) } diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go index 5fcc7e88f..75965e761 100644 --- a/weed/server/filer_server_rocksdb.go +++ b/weed/server/filer_server_rocksdb.go @@ -1,3 +1,4 @@ +//go:build rocksdb // +build rocksdb package weed_server diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go index 5016117a8..3201ff76c 100644 --- a/weed/server/filer_ui/breadcrumb.go +++ b/weed/server/filer_ui/breadcrumb.go @@ -15,8 +15,12 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) { parts := strings.Split(fullpath, "/") for i := 0; i < len(parts); i++ { + name := parts[i] + if name == "" { + name = "/" + } crumb := Breadcrumb{ - Name: parts[i] + " /", + Name: name, Link: "/" + util.Join(parts[0:i+1]...), } if !strings.HasSuffix(crumb.Link, "/") { diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html index 84dc4d4d6..c9d832e8f 100644 --- a/weed/server/filer_ui/filer.html +++ b/weed/server/filer_ui/filer.html @@ -11,6 +11,7 @@ #drop-area { border: 1px transparent; + margin-top: 5px; } #drop-area.highlight { @@ -26,6 +27,12 @@ border-radius: 2px; border: 1px solid #ccc; float: right; + margin-left: 2px; + margin-bottom: 0; + } + + label { + font-weight: normal; } .button:hover { @@ -36,10 +43,37 @@ display: none; } - .qrImage { + td, th { + vertical-align: bottom; + } + + .table-hover > tbody > tr:hover > * > div.operations { display: block; - margin-left: auto; - margin-right: auto; + } + + .table > tbody > tr { + height: 39px; + } + + div.operations { + display: none; + } + + .footer { + position: absolute; + bottom: 0px; + right: 5%; + min-width: 25%; + border-left: 1px solid #ccc; + border-right: 1px solid #ccc; + } + + .add-files { + font-size: 46px; + text-align: center; + border: 1px dashed #999; + padding-bottom: 9px; + margin: 0 2px; } </style> </head> @@ -53,12 +87,21 @@ </div> <div class="row"> <div> + <div class="btn-group btn-group-sm pull-right" role="group" style="margin-top:3px;"> + <label class="btn btn-default" onclick="handleCreateDir()"> + <span class="glyphicon glyphicon-plus" aria-hidden="true"></span> New Folder + </label> + <label class="btn btn-default" for="fileElem"> + <span class="glyphicon glyphicon-cloud-upload" aria-hidden="true"></span> Upload + </label> + </div> + <ol class="breadcrumb"> {{ range $entry := .Breadcrumbs }} - <a href="{{ printpath $entry.Link }}"> + <li><a href="{{ printpath $entry.Link }}"> {{ $entry.Name }} - </a> + </li></a> {{ end }} - <label class="button" for="fileElem">Upload</label> + </ol> </div> </div> @@ -66,117 +109,250 @@ <form class="upload-form"> <input type="file" id="fileElem" multiple onchange="handleFiles(this.files)"> - <table width="90%"> - {{$path := .Path }} + {{ if .EmptyFolder }} + <div class="row add-files"> + + + </div> + {{ else }} + <table width="100%" class="table table-hover"> + {{ $path := .Path }} + {{ $showDirDel := .ShowDirectoryDelete }} {{ range $entry_index, $entry := .Entries }} <tr> <td> - {{if $entry.IsDirectory}} - <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23"> + {{ if $entry.IsDirectory }} + <span class="glyphicon glyphicon-folder-open" aria-hidden="true"></span> <a href="{{ printpath $path "/" $entry.Name "/"}}" > {{ $entry.Name }} </a> - {{else}} + {{ else }} <a href="{{ printpath $path "/" $entry.Name }}" > {{ $entry.Name }} </a> - {{end}} + {{ end }} </td> <td align="right" nowrap> - {{if $entry.IsDirectory}} - {{else}} + {{ if not $entry.IsDirectory }} {{ $entry.Mime }} - {{end}} + {{ end }} </td> <td align="right" nowrap> - {{if $entry.IsDirectory}} - {{else}} + {{ if not $entry.IsDirectory }} {{ $entry.Size | humanizeBytes }} - {{end}} + {{ end }} </td> - <td nowrap> + <td align="right" nowrap> {{ $entry.Timestamp.Format "2006-01-02 15:04" }} </td> + <td style="width:75px"> + <div class="btn-group btn-group-xs pull-right operations" role="group"> + <label class="btn" onclick="handleRename('{{ $entry.Name }}', '{{ printpath $path "/" }}')"> + <span class="glyphicon glyphicon-edit" aria-hidden="true"></span> + </label> + {{ if and $entry.IsDirectory $showDirDel }} + <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name "/" }}')"> + <span class="glyphicon glyphicon-trash" aria-hidden="true"></span> + </label> + {{ end }} + {{ if not $entry.IsDirectory }} + <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name }}')"> + <span class="glyphicon glyphicon-trash" aria-hidden="true"></span> + </label> + {{ end }} + </div> + </td> </tr> {{ end }} - </table> + {{ end }} </form> </div> - {{if .ShouldDisplayLoadMore}} + {{ if .ShouldDisplayLoadMore }} <div class="row"> - <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName}} > + <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName }} > Load more </a> </div> - {{end}} + {{ end }} <br/> <br/> - - <div class="navbar navbar-fixed-bottom"> - <img src="data:image/png;base64,{{.QrImage}}" class="qrImage"/> + <div id="progress-area" class="footer" style="display: none;"> </div> - </div> </body> <script type="text/javascript"> // ************************ Drag and drop ***************** // - let dropArea = document.getElementById("drop-area") + let dropArea = document.getElementById("drop-area"); + let progressArea = document.getElementById("progress-area"); // Prevent default drag behaviors ;['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => { - dropArea.addEventListener(eventName, preventDefaults, false) - document.body.addEventListener(eventName, preventDefaults, false) - }) + dropArea.addEventListener(eventName, preventDefaults, false); + document.body.addEventListener(eventName, preventDefaults, false); + }); // Highlight drop area when item is dragged over it ;['dragenter', 'dragover'].forEach(eventName => { - dropArea.addEventListener(eventName, highlight, false) - }) + dropArea.addEventListener(eventName, highlight, false); + }); ;['dragleave', 'drop'].forEach(eventName => { - dropArea.addEventListener(eventName, unhighlight, false) - }) + dropArea.addEventListener(eventName, unhighlight, false); + }); // Handle dropped files - dropArea.addEventListener('drop', handleDrop, false) + dropArea.addEventListener('drop', handleDrop, false); function preventDefaults(e) { - e.preventDefault() - e.stopPropagation() + e.preventDefault(); + e.stopPropagation(); } function highlight(e) { - dropArea.classList.add('highlight') + dropArea.classList.add('highlight'); } function unhighlight(e) { - dropArea.classList.remove('highlight') + dropArea.classList.remove('highlight'); } function handleDrop(e) { - var dt = e.dataTransfer - var files = dt.files + var dt = e.dataTransfer; + var files = dt.files; + + handleFiles(files); + } - handleFiles(files) + function reloadPage() { + window.location.reload(true); } + var uploadList = {}; + function handleFiles(files) { - files = [...files] - files.forEach(uploadFile) - window.location.reload() + files = [...files]; + files.forEach(startUpload); + renderProgress(); + files.forEach(uploadFile); + } + + function startUpload(file, i) { + uploadList[file.name] = {'name': file.name, 'percent': 0, 'finish': false}; + } + + function renderProgress() { + var values = Object.values(uploadList); + var html = '<table class="table">\n<tr><th>Uploading</th><\/tr>\n'; + for (let i of values) { + var progressBarClass = 'progress-bar-striped active'; + if (i.percent >= 100) { + progressBarClass = 'progress-bar-success'; + } + html += '<tr>\n<td>\n'; + html += '<div class="progress" style="margin-bottom: 2px;">\n'; + html += '<div class="progress-bar ' + progressBarClass + '" role="progressbar" aria-valuenow="' + '100" aria-valuemin="0" aria-valuemax="100" style="width:' + i.percent + '%;">'; + html += '<span style="margin-right: 10px;">' + i.name + '</span>' + i.percent + '%<\/div>'; + html += '<\/div>\n<\/td>\n<\/tr>\n'; + } + html += '<\/table>\n'; + progressArea.innerHTML = html; + if (values.length > 0) { + progressArea.attributes.style.value = ''; + } + } + + function reportProgress(file, percent) { + var item = uploadList[file] + item.percent = percent; + renderProgress(); + } + + function finishUpload(file) { + uploadList[file]['finish'] = true; + renderProgress(); + var allFinish = true; + for (let i of Object.values(uploadList)) { + if (!i.finish) { + allFinish = false; + break; + } + } + if (allFinish) { + console.log('All Finish'); + reloadPage(); + } } function uploadFile(file, i) { - var url = window.location.href - var xhr = new XMLHttpRequest() - var formData = new FormData() - xhr.open('POST', url, false) + var url = window.location.href; + var xhr = new XMLHttpRequest(); + var fileName = file.name; + xhr.upload.addEventListener('progress', function(e) { + if (e.lengthComputable) { + var percent = Math.ceil((e.loaded / e.total) * 100); + reportProgress(fileName, percent) + } + }); + xhr.upload.addEventListener('loadend', function(e) { + finishUpload(fileName); + }); + var formData = new FormData(); + xhr.open('POST', url, true); + formData.append('file', file); + xhr.send(formData); + } + + function handleCreateDir() { + var dirName = prompt('Folder Name:', ''); + dirName = dirName.trim(); + if (dirName == null || dirName == '') { + return; + } + var baseUrl = window.location.href; + if (!baseUrl.endsWith('/')) { + baseUrl += '/'; + } + var url = baseUrl + dirName; + if (!url.endsWith('/')) { + url += '/'; + } + var xhr = new XMLHttpRequest(); + xhr.open('POST', url, false); + xhr.setRequestHeader('Content-Type', ''); + xhr.send(); + reloadPage(); + } + + function handleRename(originName, basePath) { + var newName = prompt('New Name:', originName); + if (newName == null || newName == '') { + return; + } + var url = basePath + newName; + var originPath = basePath + originName; + url += '?mv.from=' + originPath; + var xhr = new XMLHttpRequest(); + xhr.open('POST', url, false); + xhr.setRequestHeader('Content-Type', ''); + xhr.send(); + reloadPage(); + } + + function handleDelete(path) { + if (!confirm('Are you sure to delete ' + path + '?')) { + return; + } + var url = path; + if (url.endsWith('/')) { + url += '?recursive=true'; + } - formData.append('file', file) - xhr.send(formData) + var xhr = new XMLHttpRequest(); + xhr.open('DELETE', url, false); + xhr.send(); + reloadPage(); } </script> </html> diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index afd479b21..4d0fbbc41 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -2,12 +2,17 @@ package weed_server import ( "context" + "errors" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/backend" "net" - "strings" + "sort" "time" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/stats" + "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/util" + "github.com/chrislusf/raft" "google.golang.org/grpc/peer" @@ -17,16 +22,56 @@ import ( "github.com/chrislusf/seaweedfs/weed/topology" ) +func (ms *MasterServer) RegisterUuids(heartbeat *master_pb.Heartbeat) (duplicated_uuids []string, err error) { + ms.Topo.UuidAccessLock.Lock() + defer ms.Topo.UuidAccessLock.Unlock() + key := fmt.Sprintf("%s:%d", heartbeat.Ip, heartbeat.Port) + if ms.Topo.UuidMap == nil { + ms.Topo.UuidMap = make(map[string][]string) + } + // find whether new uuid exists + for k, v := range ms.Topo.UuidMap { + sort.Strings(v) + for _, id := range heartbeat.LocationUuids { + index := sort.SearchStrings(v, id) + if index < len(v) && v[index] == id { + duplicated_uuids = append(duplicated_uuids, id) + glog.Errorf("directory of %s on %s has been loaded", id, k) + } + } + } + if len(duplicated_uuids) > 0 { + return duplicated_uuids, errors.New("volume: Duplicated volume directories were loaded") + } + + ms.Topo.UuidMap[key] = heartbeat.LocationUuids + glog.V(0).Infof("found new uuid:%v %v , %v", key, heartbeat.LocationUuids, ms.Topo.UuidMap) + return nil, nil +} + +func (ms *MasterServer) UnRegisterUuids(ip string, port int) { + ms.Topo.UuidAccessLock.Lock() + defer ms.Topo.UuidAccessLock.Unlock() + key := fmt.Sprintf("%s:%d", ip, port) + delete(ms.Topo.UuidMap, key) + glog.V(0).Infof("remove volume server %v, online volume server: %v", key, ms.Topo.UuidMap) +} + func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error { var dn *topology.DataNode defer func() { if dn != nil { - + dn.Counter-- + if dn.Counter > 0 { + glog.V(0).Infof("disconnect phantom volume server %s:%d remaining %d", dn.Ip, dn.Port, dn.Counter) + return + } // if the volume server disconnects and reconnects quickly // the unregister and register can race with each other ms.Topo.UnRegisterDataNode(dn) glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port) + ms.UnRegisterUuids(dn.Ip, dn.Port) message := &master_pb.VolumeLocation{ Url: dn.Url(), @@ -40,13 +85,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(message.DeletedVids) > 0 { - ms.clientChansLock.RLock() - for _, ch := range ms.clientChans { - ch <- message - } - ms.clientChansLock.RUnlock() + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message}) } - } }() @@ -58,6 +98,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } else { glog.Warningf("SendHeartbeat.Recv: %v", err) } + stats.MasterReceivedHeartbeatCounter.WithLabelValues("error").Inc() return err } @@ -67,19 +108,34 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) - dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts) - glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) + dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts) + glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids) + uuidlist, err := ms.RegisterUuids(heartbeat) + if err != nil { + if stream_err := stream.Send(&master_pb.HeartbeatResponse{ + DuplicatedUuids: uuidlist, + }); stream_err != nil { + glog.Warningf("SendHeartbeat.Send DuplicatedDirectory response to %s:%d %v", dn.Ip, dn.Port, stream_err) + return stream_err + } + return err + } + if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, }); err != nil { glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err) return err } + stats.MasterReceivedHeartbeatCounter.WithLabelValues("dataNode").Inc() + dn.Counter++ } dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts) glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) + stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc() + var dataCenter string if dc := dn.GetDataCenter(); dc != nil { dataCenter = string(dc.Id()) @@ -89,6 +145,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ PublicUrl: dn.PublicUrl, DataCenter: dataCenter, } + if len(heartbeat.NewVolumes) > 0 { + stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc() + } + if len(heartbeat.DeletedVolumes) > 0 { + stats.FilerRequestCounter.WithLabelValues("deletedVolumes").Inc() + } if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 { // process delta volume ids if exists for fast volume id updates for _, volInfo := range heartbeat.NewVolumes { @@ -102,7 +164,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes { + dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack) + ms.Topo.DataNodeRegistration(dcName, rackName, dn) + // process heartbeat.Volumes + stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc() newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn) for _, v := range newVolumes { @@ -116,45 +182,41 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 { - + stats.MasterReceivedHeartbeatCounter.WithLabelValues("newEcShards").Inc() // update master internal volume layouts ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn) for _, s := range heartbeat.NewEcShards { - message.NewVids = append(message.NewVids, s.Id) + message.NewEcVids = append(message.NewEcVids, s.Id) } for _, s := range heartbeat.DeletedEcShards { - if dn.HasVolumesById(needle.VolumeId(s.Id)) { + if dn.HasEcShards(needle.VolumeId(s.Id)) { continue } - message.DeletedVids = append(message.DeletedVids, s.Id) + message.DeletedEcVids = append(message.DeletedEcVids, s.Id) } } if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards { - glog.V(1).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) + stats.MasterReceivedHeartbeatCounter.WithLabelValues("ecShards").Inc() + glog.V(4).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards) newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn) // broadcast the ec vid changes to master clients for _, s := range newShards { - message.NewVids = append(message.NewVids, uint32(s.VolumeId)) + message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId)) } for _, s := range deletedShards { if dn.HasVolumesById(s.VolumeId) { continue } - message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId)) + message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId)) } } - if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 { - ms.clientChansLock.RLock() - for host, ch := range ms.clientChans { - glog.V(0).Infof("master send to %s: %s", host, message.String()) - ch <- message - } - ms.clientChansLock.RUnlock() + if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 { + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message}) } // tell the volume servers about the leader @@ -164,7 +226,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ return err } if err := stream.Send(&master_pb.HeartbeatResponse{ - Leader: newLeader, + Leader: string(newLeader), }); err != nil { glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err) return err @@ -185,17 +247,25 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ return ms.informNewLeader(stream) } - peerAddress := findClientAddress(stream.Context(), req.GrpcPort) + peerAddress := pb.ServerAddress(req.ClientAddress) // buffer by 1 so we don't end up getting stuck writing to stopChan forever stopChan := make(chan bool, 1) - clientName, messageChan := ms.addClient(req.Name, peerAddress) + clientName, messageChan := ms.addClient(req.FilerGroup, req.ClientType, peerAddress) + for _, update := range ms.Cluster.AddClusterNode(req.FilerGroup, req.ClientType, peerAddress, req.Version) { + ms.broadcastToClients(update) + } - defer ms.deleteClient(clientName) + defer func() { + for _, update := range ms.Cluster.RemoveClusterNode(req.FilerGroup, req.ClientType, peerAddress) { + ms.broadcastToClients(update) + } + ms.deleteClient(clientName) + }() for _, message := range ms.Topo.ToVolumeLocations() { - if sendErr := stream.Send(message); sendErr != nil { + if sendErr := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); sendErr != nil { return sendErr } } @@ -221,7 +291,10 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } case <-ticker.C: if !ms.Topo.IsLeader() { + stats.MasterRaftIsleader.Set(0) return ms.informNewLeader(stream) + } else { + stats.MasterRaftIsleader.Set(1) } case <-stopChan: return nil @@ -230,22 +303,32 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ } +func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedResponse) { + ms.clientChansLock.RLock() + for _, ch := range ms.clientChans { + ch <- message + } + ms.clientChansLock.RUnlock() +} + func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error { leader, err := ms.Topo.Leader() if err != nil { glog.Errorf("topo leader: %v", err) return raft.NotLeaderError } - if err := stream.Send(&master_pb.VolumeLocation{ - Leader: leader, + if err := stream.Send(&master_pb.KeepConnectedResponse{ + VolumeLocation: &master_pb.VolumeLocation{ + Leader: string(leader), + }, }); err != nil { return err } return nil } -func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) { - clientName = clientType + "@" + clientAddress +func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) { + clientName = filerGroup + "." + clientType + "@" + string(clientAddress) glog.V(0).Infof("+ client %v", clientName) // we buffer this because otherwise we end up in a potential deadlock where @@ -253,7 +336,7 @@ func (ms *MasterServer) addClient(clientType string, clientAddress string) (clie // trying to send to it in SendHeartbeat and so we can't lock the // clientChansLock to remove the channel and we're stuck writing to it // 100 is probably overkill - messageChan = make(chan *master_pb.VolumeLocation, 100) + messageChan = make(chan *master_pb.KeepConnectedResponse, 100) ms.clientChansLock.Lock() ms.clientChans[clientName] = messageChan @@ -284,25 +367,12 @@ func findClientAddress(ctx context.Context, grpcPort uint32) string { } if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok { externalIP := tcpAddr.IP - return fmt.Sprintf("%s:%d", externalIP, grpcPort) + return util.JoinHostPort(externalIP.String(), int(grpcPort)) } return pr.Addr.String() } -func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) { - resp := &master_pb.ListMasterClientsResponse{} - ms.clientChansLock.RLock() - defer ms.clientChansLock.RUnlock() - - for k := range ms.clientChans { - if strings.HasPrefix(k, req.ClientType+"@") { - resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:]) - } - } - return resp, nil -} - func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) { // tell the volume servers about the leader @@ -315,7 +385,7 @@ func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_ DefaultReplication: ms.option.DefaultReplicaPlacement, VolumeSizeLimitMB: uint32(ms.option.VolumeSizeLimitMB), VolumePreallocate: ms.option.VolumePreallocate, - Leader: leader, + Leader: string(leader), } return resp, nil diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go index 93c9e4e4e..1f37e979a 100644 --- a/weed/server/master_grpc_server_admin.go +++ b/weed/server/master_grpc_server_admin.go @@ -3,7 +3,11 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "math/rand" "sync" "time" @@ -62,6 +66,7 @@ type AdminLock struct { accessSecret int64 accessLockTime time.Time lastClient string + lastMessage string } type AdminLocks struct { @@ -75,15 +80,15 @@ func NewAdminLocks() *AdminLocks { } } -func (locks *AdminLocks) isLocked(lockName string) (clientName string, isLocked bool) { +func (locks *AdminLocks) isLocked(lockName string) (clientName string, message string, isLocked bool) { locks.RLock() defer locks.RUnlock() adminLock, found := locks.locks[lockName] if !found { - return "", false + return "", "", false } - glog.V(4).Infof("isLocked %v", adminLock.lastClient) - return adminLock.lastClient, adminLock.accessLockTime.Add(LockDuration).After(time.Now()) + glog.V(4).Infof("isLocked %v: %v", adminLock.lastClient, adminLock.lastMessage) + return adminLock.lastClient, adminLock.lastMessage, adminLock.accessLockTime.Add(LockDuration).After(time.Now()) } func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool { @@ -117,7 +122,7 @@ func (locks *AdminLocks) deleteLock(lockName string) { func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) { resp := &master_pb.LeaseAdminTokenResponse{} - if lastClient, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked { + if lastClient, lastMessage, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked { glog.V(4).Infof("LeaseAdminToken %v", lastClient) if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) { // for renew @@ -126,7 +131,7 @@ func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.Leas return resp, nil } // refuse since still locked - return resp, fmt.Errorf("already locked by " + lastClient) + return resp, fmt.Errorf("already locked by %v: %v", lastClient, lastMessage) } // for fresh lease request ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName) @@ -141,3 +146,41 @@ func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.Re } return resp, nil } + +func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) { + resp = &master_pb.PingResponse{ + StartTimeNs: time.Now().UnixNano(), + } + if req.TargetType == cluster.FilerType { + pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.VolumeServerType { + pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.MasterType { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if pingErr != nil { + pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr) + } + resp.StopTimeNs = time.Now().UnixNano() + return +} diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go new file mode 100644 index 000000000..fea4a66aa --- /dev/null +++ b/weed/server/master_grpc_server_cluster.go @@ -0,0 +1,41 @@ +package weed_server + +import ( + "context" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "math/rand" +) + +func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) { + resp := &master_pb.ListClusterNodesResponse{} + filerGroup := cluster.FilerGroup(req.FilerGroup) + clusterNodes := ms.Cluster.ListClusterNode(filerGroup, req.ClientType) + + for _, node := range clusterNodes { + resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{ + Address: string(node.Address), + Version: node.Version, + IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address), + CreatedAtNs: node.CreatedTs.UnixNano(), + }) + } + return resp, nil +} + +func (ms *MasterServer) GetOneFiler(filerGroup cluster.FilerGroup) pb.ServerAddress { + + clusterNodes := ms.Cluster.ListClusterNode(filerGroup, cluster.FilerType) + + var filers []pb.ServerAddress + for _, node := range clusterNodes { + if ms.Cluster.IsOneLeader(filerGroup, node.Address) { + filers = append(filers, node.Address) + } + } + if len(filers) > 0 { + return filers[rand.Intn(len(filers))] + } + return "localhost:8888" +} diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go index b92d6bcbe..654da6b3c 100644 --- a/weed/server/master_grpc_server_collection.go +++ b/weed/server/master_grpc_server_collection.go @@ -58,7 +58,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error { } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) @@ -78,7 +78,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error { listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName) for _, server := range listOfEcServers { - err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collectionName, }) diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go new file mode 100644 index 000000000..258b6beac --- /dev/null +++ b/weed/server/master_grpc_server_raft.go @@ -0,0 +1,79 @@ +package weed_server + +import ( + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/hashicorp/raft" +) + +func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) { + resp := &master_pb.RaftListClusterServersResponse{} + + if ms.Topo.HashicorpRaft == nil { + return resp, nil + } + + servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers + + for _, server := range servers { + resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{ + Id: string(server.ID), + Address: string(server.Address), + Suffrage: server.Suffrage.String(), + }) + } + return resp, nil +} + +func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) { + resp := &master_pb.RaftAddServerResponse{} + + if ms.Topo.HashicorpRaft == nil { + return resp, nil + } + + if ms.Topo.HashicorpRaft.State() != raft.Leader { + return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String()) + } + + var idxFuture raft.IndexFuture + if req.Voter { + idxFuture = ms.Topo.HashicorpRaft.AddVoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0) + } else { + idxFuture = ms.Topo.HashicorpRaft.AddNonvoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0) + } + + if err := idxFuture.Error(); err != nil { + return nil, err + } + return resp, nil +} + +func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) { + resp := &master_pb.RaftRemoveServerResponse{} + + if ms.Topo.HashicorpRaft == nil { + return resp, nil + } + + if ms.Topo.HashicorpRaft.State() != raft.Leader { + return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String()) + } + + if !req.Force { + ms.clientChansLock.RLock() + _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)] + ms.clientChansLock.RUnlock() + if ok { + return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id) + } + } + + idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0) + if err := idxFuture.Error(); err != nil { + return nil, err + } + return resp, nil +} diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 4b975a0c4..0382c2dae 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -42,15 +42,24 @@ func (ms *MasterServer) ProcessGrowRequest() { return !found }) + option := req.Option + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) + // not atomic but it's okay - if !found && ms.shouldVolumeGrow(req.Option) { + if !found && vl.ShouldGrowVolumes(option) { filter.Store(req, nil) // we have lock called inside vg go func() { glog.V(1).Infoln("starting automatic volume grow") start := time.Now() - _, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) + newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count) glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start)) + if err == nil { + for _, newVidLocation := range newVidLocations { + ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation}) + } + } + vl.DoneGrowRequest() if req.ErrCh != nil { req.ErrCh <- err @@ -82,7 +91,7 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV } var auth string if strings.Contains(result.VolumeOrFileId, ",") { // this is a file id - auth = string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, result.VolumeOrFileId)) + auth = string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, result.VolumeOrFileId)) } resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{ VolumeOrFileId: result.VolumeOrFileId, @@ -130,10 +139,13 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb, } - if ms.shouldVolumeGrow(option) { + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) + + if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { if ms.Topo.AvailableSpaceFor(option) <= 0 { return nil, fmt.Errorf("no free volumes left for " + option.String()) } + vl.AddGrowRequest() ms.vgCh <- &topology.VolumeGrowRequest{ Option: option, Count: int(req.WritableVolumeCount), @@ -147,14 +159,27 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest ) for time.Now().Sub(startTime) < maxTimeout { - fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option) + fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) if err == nil { + dn := dnList.Head() + var replicas []*master_pb.Location + for _, r := range dnList.Rest() { + replicas = append(replicas, &master_pb.Location{ + Url: r.Url(), + PublicUrl: r.PublicUrl, + GrpcPort: uint32(r.GrpcPort), + }) + } return &master_pb.AssignResponse{ - Fid: fid, - Url: dn.Url(), - PublicUrl: dn.PublicUrl, - Count: count, - Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), + Fid: fid, + Location: &master_pb.Location{ + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + GrpcPort: uint32(dn.GrpcPort), + }, + Count: count, + Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), + Replicas: replicas, }, nil } //glog.V(4).Infoln("waiting for volume growing...") @@ -248,7 +273,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV resp := &master_pb.VacuumVolumeResponse{} - ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize) return resp, nil } diff --git a/weed/server/master_server.go b/weed/server/master_server.go index d2edeb6cb..9bf840f08 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -1,7 +1,9 @@ package weed_server import ( + "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/stats" "net/http" "net/http/httputil" "net/url" @@ -11,8 +13,12 @@ import ( "sync" "time" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/raft" "github.com/gorilla/mux" + hashicorpRaft "github.com/hashicorp/raft" "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/glog" @@ -26,14 +32,13 @@ import ( ) const ( - SequencerType = "master.sequencer.type" - SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls" - SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + SequencerType = "master.sequencer.type" + SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id" + RaftServerRemovalTime = 72 * time.Minute ) type MasterOption struct { - Host string - Port int + Master pb.ServerAddress MetaFolder string VolumeSizeLimitMB uint32 VolumePreallocate bool @@ -48,6 +53,7 @@ type MasterOption struct { } type MasterServer struct { + master_pb.UnimplementedSeaweedServer option *MasterOption guard *security.Guard @@ -59,18 +65,23 @@ type MasterServer struct { boundedLeaderChan chan int + onPeerUpdatDoneCn chan string + onPeerUpdatDoneCnExist bool + // notifying clients clientChansLock sync.RWMutex - clientChans map[string]chan *master_pb.VolumeLocation + clientChans map[string]chan *master_pb.KeepConnectedResponse grpcDialOption grpc.DialOption MasterClient *wdclient.MasterClient adminLocks *AdminLocks + + Cluster *cluster.Cluster } -func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer { +func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer { v := util.GetViper() signingKey := v.GetString("jwt.signing.key") @@ -100,12 +111,16 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste option: option, preallocateSize: preallocateSize, vgCh: make(chan *topology.VolumeGrowRequest, 1<<6), - clientChans: make(map[string]chan *master_pb.VolumeLocation), + clientChans: make(map[string]chan *master_pb.KeepConnectedResponse), grpcDialOption: grpcDialOption, - MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers), + MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", peers), adminLocks: NewAdminLocks(), + Cluster: cluster.NewCluster(), } ms.boundedLeaderChan = make(chan int, 16) + ms.onPeerUpdatDoneCn = make(chan string) + + ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate seq := ms.createSequencer(option) if nil == seq { @@ -154,18 +169,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste } func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) { - ms.Topo.RaftServer = raftServer.raftServer - ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { - glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) - if ms.Topo.RaftServer.Leader() != "" { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") - } - }) + var raftServerName string + if raftServer.raftServer != nil { + ms.Topo.RaftServer = raftServer.raftServer + ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) { + glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value()) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc() + if ms.Topo.RaftServer.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.") + } + }) + raftServerName = ms.Topo.RaftServer.Name() + } else if raftServer.RaftHashicorp != nil { + ms.Topo.HashicorpRaft = raftServer.RaftHashicorp + leaderCh := raftServer.RaftHashicorp.LeaderCh() + prevLeader := ms.Topo.HashicorpRaft.Leader() + go func() { + for { + select { + case isLeader := <-leaderCh: + leader := ms.Topo.HashicorpRaft.Leader() + glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader) + stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc() + prevLeader = leader + } + } + }() + raftServerName = ms.Topo.HashicorpRaft.String() + } if ms.Topo.IsLeader() { - glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!") + glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!") } else { - if ms.Topo.RaftServer.Leader() != "" { + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.") + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.") } } } @@ -174,71 +212,70 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { f(w, r) - } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { - ms.boundedLeaderChan <- 1 - defer func() { <-ms.boundedLeaderChan }() - targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader()) - if err != nil { - writeJsonError(w, r, http.StatusInternalServerError, - fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err)) - return - } - glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader()) - proxy := httputil.NewSingleHostReverseProxy(targetUrl) - director := proxy.Director - proxy.Director = func(req *http.Request) { - actualHost, err := security.GetActualRemoteHost(req) - if err == nil { - req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) - } - director(req) - } - proxy.Transport = util.Transport - proxy.ServeHTTP(w, r) - } else { - // handle requests locally + return + } + var raftServerLeader string + if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" { + raftServerLeader = ms.Topo.RaftServer.Leader() + } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" { + raftServerLeader = string(ms.Topo.HashicorpRaft.Leader()) + } + if raftServerLeader == "" { f(w, r) + return + } + ms.boundedLeaderChan <- 1 + defer func() { <-ms.boundedLeaderChan }() + targetUrl, err := url.Parse("http://" + raftServerLeader) + if err != nil { + writeJsonError(w, r, http.StatusInternalServerError, + fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err)) + return } + glog.V(4).Infoln("proxying to leader", raftServerLeader) + proxy := httputil.NewSingleHostReverseProxy(targetUrl) + director := proxy.Director + proxy.Director = func(req *http.Request) { + actualHost, err := security.GetActualRemoteHost(req) + if err == nil { + req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost) + } + director(req) + } + proxy.Transport = util.Transport + proxy.ServeHTTP(w, r) } } func (ms *MasterServer) startAdminScripts() { - var err error v := util.GetViper() adminScripts := v.GetString("master.maintenance.scripts") - glog.V(0).Infof("adminScripts:\n%v", adminScripts) if adminScripts == "" { return } + glog.V(0).Infof("adminScripts: %v", adminScripts) v.SetDefault("master.maintenance.sleep_minutes", 17) sleepMinutes := v.GetInt("master.maintenance.sleep_minutes") - v.SetDefault("master.filer.default", "localhost:8888") - filerHostPort := v.GetString("master.filer.default") - scriptLines := strings.Split(adminScripts, "\n") if !strings.Contains(adminScripts, "lock") { scriptLines = append(append([]string{}, "lock"), scriptLines...) scriptLines = append(scriptLines, "unlock") } - masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port) + masterAddress := string(ms.option.Master) var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") shellOptions.Masters = &masterAddress - shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort) - shellOptions.FilerAddress = filerHostPort shellOptions.Directory = "/" - if err != nil { - glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err) - return - } + emptyFilerGroup := "" + shellOptions.FilerGroup = &emptyFilerGroup - commandEnv := shell.NewCommandEnv(shellOptions) + commandEnv := shell.NewCommandEnv(&shellOptions) reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`) @@ -247,9 +284,13 @@ func (ms *MasterServer) startAdminScripts() { go func() { commandEnv.MasterClient.WaitUntilConnected() - c := time.Tick(time.Duration(sleepMinutes) * time.Minute) - for range c { + for { + time.Sleep(time.Duration(sleepMinutes) * time.Minute) if ms.Topo.IsLeader() { + shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroup(*shellOptions.FilerGroup)) + if shellOptions.FilerAddress == "" { + continue + } for _, line := range scriptLines { for _, c := range strings.Split(line, ";") { processEachCmd(reg, c, commandEnv) @@ -287,19 +328,10 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer seqType := strings.ToLower(v.GetString(SequencerType)) glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType) switch strings.ToLower(seqType) { - case "etcd": - var err error - urls := v.GetString(SequencerEtcdUrls) - glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls) - seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder) - if err != nil { - glog.Error(err) - seq = nil - } case "snowflake": var err error snowflakeId := v.GetInt(SequencerSnowflakeId) - seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port), snowflakeId) + seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId) if err != nil { glog.Error(err) seq = nil @@ -309,3 +341,57 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer } return seq } + +func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) { + if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil { + return + } + glog.V(4).Infof("OnPeerUpdate: %+v", update) + + peerAddress := pb.ServerAddress(update.Address) + peerName := string(peerAddress) + isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader + if update.IsAdd { + if isLeader { + raftServerFound := false + for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerName { + raftServerFound = true + } + } + if !raftServerFound { + glog.V(0).Infof("adding new raft server: %s", peerName) + ms.Topo.HashicorpRaft.AddVoter( + hashicorpRaft.ServerID(peerName), + hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0) + } + } + if ms.onPeerUpdatDoneCnExist { + ms.onPeerUpdatDoneCn <- peerName + } + } else if isLeader { + go func(peerName string) { + for { + select { + case <-time.After(RaftServerRemovalTime): + err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error { + _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{ + Id: peerName, + Force: false, + }) + return err + }) + if err != nil { + glog.Warningf("failed to removing old raft server %s: %v", peerName, err) + } + return + case peerDone := <-ms.onPeerUpdatDoneCn: + if peerName == peerDone { + return + } + } + } + }(peerName) + ms.onPeerUpdatDoneCnExist = true + } +} diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 2a1f6d523..0b79c4ed5 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -113,13 +113,16 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) return } - if ms.shouldVolumeGrow(option) { + vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) + + if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) { glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr) if ms.Topo.AvailableSpaceFor(option) <= 0 { writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()}) return } errCh := make(chan error, 1) + vl.AddGrowRequest() ms.vgCh <- &topology.VolumeGrowRequest{ Option: option, Count: writableVolumeCount, @@ -130,9 +133,10 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) return } } - fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) + fid, count, dnList, err := ms.Topo.PickForWrite(requestedCount, option) if err == nil { ms.maybeAddJwtAuthorization(w, fid, true) + dn := dnList.Head() writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) } else { writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) @@ -145,9 +149,9 @@ func (ms *MasterServer) maybeAddJwtAuthorization(w http.ResponseWriter, fileId s } var encodedJwt security.EncodedJwt if isWrite { - encodedJwt = security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId) + encodedJwt = security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId) } else { - encodedJwt = security.GenJwt(ms.guard.ReadSigningKey, ms.guard.ReadExpiresAfterSec, fileId) + encodedJwt = security.GenJwtForVolumeServer(ms.guard.ReadSigningKey, ms.guard.ReadExpiresAfterSec, fileId) } if encodedJwt == "" { return diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go index 4a86348d9..47abfb892 100644 --- a/weed/server/master_server_handlers_admin.go +++ b/weed/server/master_server_handlers_admin.go @@ -3,6 +3,8 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" "math/rand" "net/http" "strconv" @@ -26,7 +28,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R return } for _, server := range collection.ListVolumeServers() { - err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { _, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{ Collection: collection.Name, }) @@ -63,7 +65,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque } } // glog.Infoln("garbageThreshold =", gcThreshold) - ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize) + ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize) ms.dirStatusHandler(w, r) } @@ -80,7 +82,9 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) { err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount()) } else { - count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo) + var newVidLocations []*master_pb.VolumeLocation + newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo) + count = len(newVidLocations) } } else { err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count")) @@ -118,32 +122,19 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request) } } -func (ms *MasterServer) selfUrl(r *http.Request) string { - if r.Host != "" { - return r.Host - } - return "localhost:" + strconv.Itoa(ms.option.Port) -} func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) { if ms.Topo.IsLeader() { - submitForClientHandler(w, r, func() string { return ms.selfUrl(r) }, ms.grpcDialOption) + submitForClientHandler(w, r, func() pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption) } else { masterUrl, err := ms.Topo.Leader() if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) } else { - submitForClientHandler(w, r, func() string { return masterUrl }, ms.grpcDialOption) + submitForClientHandler(w, r, func() pb.ServerAddress { return masterUrl }, ms.grpcDialOption) } } } -func (ms *MasterServer) shouldVolumeGrow(option *topology.VolumeGrowOption) bool { - vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType) - active, high := vl.GetActiveVolumeCount(option) - //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high) - return active <= high -} - func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) { replicationString := r.FormValue("replication") if replicationString == "" { diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go index 015bfbd00..d8260d8d2 100644 --- a/weed/server/master_server_handlers_ui.go +++ b/weed/server/master_server_handlers_ui.go @@ -5,6 +5,8 @@ import ( "time" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" + ui "github.com/chrislusf/seaweedfs/weed/server/master_ui" "github.com/chrislusf/seaweedfs/weed/stats" "github.com/chrislusf/seaweedfs/weed/util" @@ -13,20 +15,40 @@ import ( func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) { infos := make(map[string]interface{}) infos["Up Time"] = time.Now().Sub(startTime).String() - args := struct { - Version string - Topology interface{} - RaftServer raft.Server - Stats map[string]interface{} - Counters *stats.ServerStats - VolumeSizeLimitMB uint32 - }{ - util.Version(), - ms.Topo.ToMap(), - ms.Topo.RaftServer, - infos, - serverStats, - ms.option.VolumeSizeLimitMB, + infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId() + if ms.Topo.RaftServer != nil { + args := struct { + Version string + Topology interface{} + RaftServer raft.Server + Stats map[string]interface{} + Counters *stats.ServerStats + VolumeSizeLimitMB uint32 + }{ + util.Version(), + ms.Topo.ToMap(), + ms.Topo.RaftServer, + infos, + serverStats, + ms.option.VolumeSizeLimitMB, + } + ui.StatusTpl.Execute(w, args) + } else if ms.Topo.HashicorpRaft != nil { + args := struct { + Version string + Topology interface{} + RaftServer *hashicorpRaft.Raft + Stats map[string]interface{} + Counters *stats.ServerStats + VolumeSizeLimitMB uint32 + }{ + util.Version(), + ms.Topo.ToMap(), + ms.Topo.HashicorpRaft, + infos, + serverStats, + ms.option.VolumeSizeLimitMB, + } + ui.StatusNewRaftTpl.Execute(w, args) } - ui.StatusTpl.Execute(w, args) } diff --git a/weed/server/master_ui/masterNewRaft.html b/weed/server/master_ui/masterNewRaft.html new file mode 100644 index 000000000..32afdceac --- /dev/null +++ b/weed/server/master_ui/masterNewRaft.html @@ -0,0 +1,121 @@ +<!DOCTYPE html> +<html> +<head> + <title>SeaweedFS {{ .Version }}</title> + <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css"> +</head> +<body> +<div class="container"> + <div class="page-header"> + <h1> + <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a> + SeaweedFS <small>{{ .Version }}</small> + </h1> + </div> + + <div class="row"> + <div class="col-sm-6"> + <h2>Cluster status</h2> + <table class="table table-condensed table-striped"> + <tbody> + <tr> + <th>Volume Size Limit</th> + <td>{{ .VolumeSizeLimitMB }}MB</td> + </tr> + <tr> + <th>Free</th> + <td>{{ .Topology.Free }}</td> + </tr> + <tr> + <th>Max</th> + <td>{{ .Topology.Max }}</td> + </tr> + {{ with .RaftServer }} + <tr> + <th>Leader</th> + <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td> + </tr> + <tr> + <th>Other Masters</th> + <td class="col-sm-5"> + <ul class="list-unstyled"> + {{ range $k, $p := .GetConfiguration.Configuration.Servers }} + <li><a href="http://{{ $p.ID }}/ui/index.html">{{ $p.ID }}</a></li> + {{ end }} + </ul> + </td> + </tr> + {{ end }} + </tbody> + </table> + </div> + + <div class="col-sm-6"> + <h2>System Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>Concurrent Connections</th> + <td>{{ .Counters.Connections.WeekCounter.Sum }}</td> + </tr> + {{ range $key, $val := .Stats }} + <tr> + <th>{{ $key }}</th> + <td>{{ $val }}</td> + </tr> + {{ end }} + </table> + <h2>Raft Stats</h2> + <table class="table table-condensed table-striped"> + <tr> + <th>applied_index</th> + <td>{{ .RaftServer.Stats.applied_index }}</td> + </tr> + <tr> + <th>last_log_term</th> + <td>{{ .RaftServer.Stats.last_log_term }}</td> + </tr> + </table> + </div> + </div> + + <div class="row"> + <h2>Topology</h2> + <table class="table table-striped"> + <thead> + <tr> + <th>Data Center</th> + <th>Rack</th> + <th>RemoteAddr</th> + <th>#Volumes</th> + <th>Volume Ids</th> + <th>#ErasureCodingShards</th> + <th>Max</th> + </tr> + </thead> + <tbody> + {{ range $dc_index, $dc := .Topology.DataCenters }} + {{ range $rack_index, $rack := $dc.Racks }} + {{ range $dn_index, $dn := $rack.DataNodes }} + <tr> + <td><code>{{ $dc.Id }}</code></td> + <td>{{ $rack.Id }}</td> + <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a> + {{ if ne $dn.PublicUrl $dn.Url }} + / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a> + {{ end }} + </td> + <td>{{ $dn.Volumes }}</td> + <td>{{ $dn.VolumeIds}}</td> + <td>{{ $dn.EcShards }}</td> + <td>{{ $dn.Max }}</td> + </tr> + {{ end }} + {{ end }} + {{ end }} + </tbody> + </table> + </div> + +</div> +</body> +</html> diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go index 415022b97..a6dcc57d7 100644 --- a/weed/server/master_ui/templates.go +++ b/weed/server/master_ui/templates.go @@ -8,4 +8,8 @@ import ( //go:embed master.html var masterHtml string +//go:embed masterNewRaft.html +var masterNewRaftHtml string + var StatusTpl = template.Must(template.New("status").Parse(masterHtml)) +var StatusNewRaftTpl = template.Must(template.New("status").Parse(masterNewRaftHtml)) diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go new file mode 100644 index 000000000..9971eaa48 --- /dev/null +++ b/weed/server/raft_hashicorp.go @@ -0,0 +1,186 @@ +package weed_server + +// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28 +// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18 + +import ( + "fmt" + transport "github.com/Jille/raft-grpc-transport" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/hashicorp/raft" + boltdb "github.com/hashicorp/raft-boltdb" + "google.golang.org/grpc" + "math/rand" + "os" + "path" + "path/filepath" + "sort" + "strings" + "time" +) + +const ( + ldbFile = "logs.dat" + sdbFile = "stable.dat" + updatePeersTimeout = 15 * time.Minute +) + +func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int { + peers := make([]pb.ServerAddress, 0, len(mapPeers)) + for _, peer := range mapPeers { + peers = append(peers, peer) + } + sort.Slice(peers, func(i, j int) bool { + return strings.Compare(string(peers[i]), string(peers[j])) < 0 + }) + for i, peer := range peers { + if string(peer) == string(self) { + return i + } + } + return -1 +} + +func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) { + for _, peer := range s.peers { + cfg.Servers = append(cfg.Servers, raft.Server{ + Suffrage: raft.Voter, + ID: raft.ServerID(peer), + Address: raft.ServerAddress(peer.ToGrpcAddress()), + }) + } + return cfg +} + +func (s *RaftServer) UpdatePeers() { + for { + select { + case isLeader := <-s.RaftHashicorp.LeaderCh(): + if isLeader { + peerLeader := string(s.serverAddr) + existsPeerName := make(map[string]bool) + for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers { + if string(server.ID) == peerLeader { + continue + } + existsPeerName[string(server.ID)] = true + } + for _, peer := range s.peers { + peerName := string(peer) + if peerName == peerLeader || existsPeerName[peerName] { + continue + } + glog.V(0).Infof("adding new peer: %s", peerName) + s.RaftHashicorp.AddVoter( + raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0) + } + for peer, _ := range existsPeerName { + if _, found := s.peers[peer]; !found { + glog.V(0).Infof("removing old peer: %s", peer) + s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0) + } + } + if _, found := s.peers[peerLeader]; !found { + glog.V(0).Infof("removing old leader peer: %s", peerLeader) + s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0) + } + } + return + case <-time.After(updatePeersTimeout): + return + } + } +} + +func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) { + s := &RaftServer{ + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, + } + + c := raft.DefaultConfig() + c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change + c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + c.ElectionTimeout = option.ElectionTimeout + if c.LeaderLeaseTimeout > c.HeartbeatTimeout { + c.LeaderLeaseTimeout = c.HeartbeatTimeout + } + if glog.V(4) { + c.LogLevel = "Debug" + } else if glog.V(2) { + c.LogLevel = "Info" + } else if glog.V(1) { + c.LogLevel = "Warn" + } else if glog.V(0) { + c.LogLevel = "Error" + } + + if option.RaftBootstrap { + os.RemoveAll(path.Join(s.dataDir, ldbFile)) + os.RemoveAll(path.Join(s.dataDir, sdbFile)) + os.RemoveAll(path.Join(s.dataDir, "snapshots")) + } + if err := os.MkdirAll(path.Join(s.dataDir, "snapshots"), os.ModePerm); err != nil { + return nil, err + } + baseDir := s.dataDir + + ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, ldbFile)) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err) + } + + sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, sdbFile)) + if err != nil { + return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err) + } + + fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr) + if err != nil { + return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err) + } + + s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption}) + + stateMachine := StateMachine{topo: option.Topo} + s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport()) + if err != nil { + return nil, fmt.Errorf("raft.NewRaft: %v", err) + } + if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 { + cfg := s.AddPeersConfiguration() + // Need to get lock, in case all servers do this at the same time. + peerIdx := getPeerIdx(s.serverAddr, s.peers) + timeSpeep := time.Duration(float64(c.LeaderLeaseTimeout) * (rand.Float64()*0.25 + 1) * float64(peerIdx)) + glog.V(0).Infof("Bootstrapping idx: %d sleep: %v new cluster: %+v", peerIdx, timeSpeep, cfg) + time.Sleep(timeSpeep) + f := s.RaftHashicorp.BootstrapCluster(cfg) + if err := f.Error(); err != nil { + return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err) + } + } else { + go s.UpdatePeers() + } + + ticker := time.NewTicker(c.HeartbeatTimeout * 10) + if glog.V(4) { + go func() { + for { + select { + case <-ticker.C: + cfuture := s.RaftHashicorp.GetConfiguration() + if err = cfuture.Error(); err != nil { + glog.Fatalf("error getting config: %s", err) + } + configuration := cfuture.Configuration() + glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers) + } + } + }() + } + + return s, nil +} diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go index 85841e409..ad0a1c8ce 100644 --- a/weed/server/raft_server.go +++ b/weed/server/raft_server.go @@ -2,10 +2,12 @@ package weed_server import ( "encoding/json" + transport "github.com/Jille/raft-grpc-transport" + "io" + "io/ioutil" "math/rand" "os" "path" - "sort" "time" "google.golang.org/grpc" @@ -13,17 +15,32 @@ import ( "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/raft" + hashicorpRaft "github.com/hashicorp/raft" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/topology" ) +type RaftServerOption struct { + GrpcDialOption grpc.DialOption + Peers map[string]pb.ServerAddress + ServerAddr pb.ServerAddress + DataDir string + Topo *topology.Topology + RaftResumeState bool + HeartbeatInterval time.Duration + ElectionTimeout time.Duration + RaftBootstrap bool +} + type RaftServer struct { - peers []string // initial peers to join with - raftServer raft.Server - dataDir string - serverAddr string - topo *topology.Topology + peers map[string]pb.ServerAddress // initial peers to join with + raftServer raft.Server + RaftHashicorp *hashicorpRaft.Raft + TransportManager *transport.Manager + dataDir string + serverAddr pb.ServerAddress + topo *topology.Topology *raft.GrpcServer } @@ -32,6 +49,8 @@ type StateMachine struct { topo *topology.Topology } +var _ hashicorpRaft.FSM = &StateMachine{} + func (s StateMachine) Save() ([]byte, error) { state := topology.MaxVolumeIdCommand{ MaxVolumeId: s.topo.GetMaxVolumeId(), @@ -51,12 +70,42 @@ func (s StateMachine) Recovery(data []byte) error { return nil } -func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) { +func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} { + before := s.topo.GetMaxVolumeId() + state := topology.MaxVolumeIdCommand{} + err := json.Unmarshal(l.Data, &state) + if err != nil { + return err + } + s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId) + + glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId()) + return nil +} + +func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) { + return &topology.MaxVolumeIdCommand{ + MaxVolumeId: s.topo.GetMaxVolumeId(), + }, nil +} + +func (s *StateMachine) Restore(r io.ReadCloser) error { + b, err := ioutil.ReadAll(r) + if err != nil { + return err + } + if err := s.Recovery(b); err != nil { + return err + } + return nil +} + +func NewRaftServer(option *RaftServerOption) (*RaftServer, error) { s := &RaftServer{ - peers: peers, - serverAddr: serverAddr, - dataDir: dataDir, - topo: topo, + peers: option.Peers, + serverAddr: option.ServerAddr, + dataDir: option.DataDir, + topo: option.Topo, } if glog.V(4) { @@ -66,27 +115,29 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d raft.RegisterCommand(&topology.MaxVolumeIdCommand{}) var err error - transporter := raft.NewGrpcTransporter(grpcDialOption) - glog.V(0).Infof("Starting RaftServer with %v", serverAddr) + transporter := raft.NewGrpcTransporter(option.GrpcDialOption) + glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr) - if !raftResumeState { + // always clear previous log to avoid server is promotable + os.RemoveAll(path.Join(s.dataDir, "log")) + if !option.RaftResumeState { // always clear previous metadata os.RemoveAll(path.Join(s.dataDir, "conf")) - os.RemoveAll(path.Join(s.dataDir, "log")) os.RemoveAll(path.Join(s.dataDir, "snapshot")) } - if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil { + if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil { return nil, err } - stateMachine := StateMachine{topo: topo} - s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "") + stateMachine := StateMachine{topo: option.Topo} + s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "") if err != nil { glog.V(0).Infoln(err) return nil, err } - s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond) - s.raftServer.SetElectionTimeout(10 * time.Second) + heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1)) + s.raftServer.SetHeartbeatInterval(heartbeatInterval) + s.raftServer.SetElectionTimeout(option.ElectionTimeout) if err := s.raftServer.LoadSnapshot(); err != nil { return nil, err } @@ -94,68 +145,53 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d return nil, err } - for _, peer := range s.peers { - if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil { + for name, peer := range s.peers { + if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil { return nil, err } } // Remove deleted peers for existsPeerName := range s.raftServer.Peers() { - exists, existingPeer := false, "" - for _, peer := range s.peers { - if pb.ServerToGrpcAddress(peer) == existsPeerName { - exists, existingPeer = true, peer - break - } - } - if exists { + if existingPeer, found := s.peers[existsPeerName]; !found { if err := s.raftServer.RemovePeer(existsPeerName); err != nil { glog.V(0).Infoln(err) return nil, err } else { - glog.V(0).Infof("removing old peer %s", existingPeer) + glog.V(0).Infof("removing old peer: %s", existingPeer) } } } s.GrpcServer = raft.NewGrpcServer(s.raftServer) - if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) { - // Initialize the server by joining itself. - // s.DoJoinCommand() - } - glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader()) return s, nil } func (s *RaftServer) Peers() (members []string) { - peers := s.raftServer.Peers() - - for _, p := range peers { - members = append(members, p.Name) + if s.raftServer != nil { + peers := s.raftServer.Peers() + for _, p := range peers { + members = append(members, p.Name) + } + } else if s.RaftHashicorp != nil { + cfg := s.RaftHashicorp.GetConfiguration() + for _, p := range cfg.Configuration().Servers { + members = append(members, string(p.ID)) + } } - return } -func isTheFirstOne(self string, peers []string) bool { - sort.Strings(peers) - if len(peers) <= 0 { - return true - } - return self == peers[0] -} - func (s *RaftServer) DoJoinCommand() { glog.V(0).Infoln("Initializing new cluster") if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{ Name: s.raftServer.Name(), - ConnectionString: pb.ServerToGrpcAddress(s.serverAddr), + ConnectionString: s.serverAddr.ToGrpcAddress(), }); err != nil { glog.Errorf("fail to send join command: %v", err) } diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go index 252570eab..cc3e6e37f 100644 --- a/weed/server/raft_server_handlers.go +++ b/weed/server/raft_server_handlers.go @@ -1,15 +1,16 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "net/http" ) type ClusterStatusResult struct { - IsLeader bool `json:"IsLeader,omitempty"` - Leader string `json:"Leader,omitempty"` - Peers []string `json:"Peers,omitempty"` - MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"` + IsLeader bool `json:"IsLeader,omitempty"` + Leader pb.ServerAddress `json:"Leader,omitempty"` + Peers []string `json:"Peers,omitempty"` + MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"` } func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { @@ -24,3 +25,11 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) { } writeJsonQuiet(w, r, http.StatusOK, ret) } + +func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) { + if s.RaftHashicorp == nil { + writeJsonQuiet(w, r, http.StatusNotFound, nil) + return + } + writeJsonQuiet(w, r, http.StatusOK, s.RaftHashicorp.Stats()) +} diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go index 898c3da12..2ffdf2226 100644 --- a/weed/server/volume_grpc_admin.go +++ b/weed/server/volume_grpc_admin.go @@ -3,7 +3,13 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/cluster" + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/util" "path/filepath" + "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" @@ -183,7 +189,12 @@ func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb. func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) { - resp := &volume_server_pb.VolumeServerStatusResponse{} + resp := &volume_server_pb.VolumeServerStatusResponse{ + MemoryStatus: stats.MemStat(), + Version: util.Version(), + DataCenter: vs.dataCenter, + Rack: vs.rack, + } for _, loc := range vs.store.Locations { if dir, e := filepath.Abs(loc.Directory); e == nil { @@ -191,8 +202,6 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv } } - resp.MemoryStatus = stats.MemStat() - return resp, nil } @@ -247,3 +256,41 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv return resp, nil } + +func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) { + resp = &volume_server_pb.PingResponse{ + StartTimeNs: time.Now().UnixNano(), + } + if req.TargetType == cluster.FilerType { + pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.VolumeServerType { + pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if req.TargetType == cluster.MasterType { + pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + pingResp, err := client.Ping(ctx, &master_pb.PingRequest{}) + if pingResp != nil { + resp.RemoteTimeNs = pingResp.StartTimeNs + } + return err + }) + } + if pingErr != nil { + pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr) + } + resp.StopTimeNs = time.Now().UnixNano() + return +} diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index f8875169f..078b78eb2 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -2,9 +2,11 @@ package weed_server import ( "fmt" - "github.com/chrislusf/seaweedfs/weed/operation" + "os" "time" + "github.com/chrislusf/seaweedfs/weed/operation" + "google.golang.org/grpc" "github.com/chrislusf/seaweedfs/weed/pb" @@ -19,15 +21,14 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (vs *VolumeServer) GetMaster() string { +func (vs *VolumeServer) GetMaster() pb.ServerAddress { return vs.currentMaster } func (vs *VolumeServer) checkWithMaster() (err error) { - isConnected := false - for !isConnected { + for { for _, master := range vs.SeedMasterNodes { - err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { + err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) if err != nil { return fmt.Errorf("get master %s configuration: %v", master, err) @@ -44,7 +45,6 @@ func (vs *VolumeServer) checkWithMaster() (err error) { } time.Sleep(1790 * time.Millisecond) } - return } func (vs *VolumeServer) heartbeat() { @@ -56,7 +56,7 @@ func (vs *VolumeServer) heartbeat() { grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume") var err error - var newLeader string + var newLeader pb.ServerAddress for vs.isHeartbeating { for _, master := range vs.SeedMasterNodes { if newLeader != "" { @@ -65,13 +65,8 @@ func (vs *VolumeServer) heartbeat() { time.Sleep(3 * time.Second) master = newLeader } - masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master) - if parseErr != nil { - glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr) - continue - } vs.store.MasterAddress = master - newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) + newLeader, err = vs.doHeartbeat(master, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second) if err != nil { glog.V(0).Infof("heartbeat error: %v", err) time.Sleep(time.Duration(vs.pulseSeconds) * time.Second) @@ -94,25 +89,25 @@ func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) { return false } -func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) { +func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption) + grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption) if err != nil { - return "", fmt.Errorf("fail to dial %s : %v", masterNode, err) + return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err) } defer grpcConection.Close() client := master_pb.NewSeaweedClient(grpcConection) stream, err := client.SendHeartbeat(ctx) if err != nil { - glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err) + glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err) return "", err } - glog.V(0).Infof("Heartbeat to: %v", masterNode) - vs.currentMaster = masterNode + glog.V(0).Infof("Heartbeat to: %v", masterAddress) + vs.currentMaster = masterAddress doneChan := make(chan error, 1) @@ -123,17 +118,30 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi doneChan <- err return } + if len(in.DuplicatedUuids) > 0 { + var duplicateDir []string + for _, loc := range vs.store.Locations { + for _, uuid := range in.DuplicatedUuids { + if uuid == loc.DirectoryUuid { + duplicateDir = append(duplicateDir, loc.Directory) + } + } + } + glog.Errorf("Shut down Volume Server due to duplicate volume directories: %v", duplicateDir) + os.Exit(1) + } if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() { vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit()) if vs.store.MaybeAdjustVolumeMax() { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err) + return } } } - if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() { + if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() { glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster) - newLeader = in.GetLeader() + newLeader = pb.ServerAddress(in.GetLeader()) doneChan <- nil return } @@ -141,12 +149,12 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi }() if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err) return "", err } if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err) return "", err } @@ -161,9 +169,9 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi &volumeMessage, }, } - glog.V(1).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) + glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) if err = stream.Send(deltaBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } case ecShardMessage := <-vs.store.NewEcShardsChan: @@ -172,10 +180,10 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi &ecShardMessage, }, } - glog.V(1).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, + glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) if err = stream.Send(deltaBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } case volumeMessage := <-vs.store.DeletedVolumesChan: @@ -184,9 +192,9 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi &volumeMessage, }, } - glog.V(1).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) + glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) if err = stream.Send(deltaBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } case ecShardMessage := <-vs.store.DeletedEcShardsChan: @@ -195,23 +203,23 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi &ecShardMessage, }, } - glog.V(1).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, + glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) if err = stream.Send(deltaBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } case <-volumeTickChan: glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port) vs.store.MaybeAdjustVolumeMax() if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err) return "", err } case <-ecShardTickChan: glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port) if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil { - glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err) return "", err } case err = <-doneChan: @@ -230,7 +238,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi } glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port) if err = stream.Send(emptyBeat); err != nil { - glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) + glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err) return "", err } return diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 2ad77a7ff..b4bc850e2 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -3,26 +3,28 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/pb/master_pb" + "github.com/chrislusf/seaweedfs/weed/storage/backend" "io" - "io/ioutil" "math" "os" "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/storage/types" "github.com/chrislusf/seaweedfs/weed/util" ) const BufferSizeLimit = 1024 * 1024 * 2 // VolumeCopy copy the .idx .dat .vif files, and mount the volume -func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) { +func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error { v := vs.store.GetVolume(needle.VolumeId(req.VolumeId)) if v != nil { @@ -31,7 +33,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId)) if err != nil { - return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err) + return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err) } glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId) @@ -45,7 +47,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo // confirm size and timestamp var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string - err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { var err error volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(), &volume_server_pb.ReadVolumeFileStatusRequest{ @@ -67,7 +69,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId)) indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId)) - ioutil.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755) + util.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755) defer func() { if err != nil { @@ -78,18 +80,66 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo } }() + var preallocateSize int64 + if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error { + resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) + if err != nil { + return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err) + } + if resp.VolumePreallocate { + preallocateSize = int64(resp.VolumeSizeLimitMB) * (1 << 20) + } + return nil + }); grpcErr != nil { + glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr) + } + + if preallocateSize > 0 { + volumeFile := dataBaseFileName + ".dat" + _, err := backend.CreateVolumeFile(volumeFile, preallocateSize, 0) + if err != nil { + return fmt.Errorf("create volume file %s: %v", volumeFile, err) + } + } + // println("source:", volFileInfoResp.String()) - if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { + copyResponse := &volume_server_pb.VolumeCopyResponse{} + reportInterval := int64(1024 * 1024 * 128) + nextReportTarget := reportInterval + var modifiedTsNs int64 + var sendErr error + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool { + if processed > nextReportTarget { + copyResponse.ProcessedBytes = processed + if sendErr = stream.Send(copyResponse); sendErr != nil { + return false + } + nextReportTarget = processed + reportInterval + } + return true + }); err != nil { return err } + if sendErr != nil { + return sendErr + } + if modifiedTsNs > 0 { + os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) + } - if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil { return err } + if modifiedTsNs > 0 { + os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) + } - if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil { return err } + if modifiedTsNs > 0 { + os.Chtimes(dataBaseFileName+".vif", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) + } os.Remove(dataBaseFileName + ".note") @@ -97,10 +147,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }) if err != nil { - return nil, err + return err } if dataBaseFileName == "" { - return nil, fmt.Errorf("not found volume %d file", req.VolumeId) + return fmt.Errorf("not found volume %d file", req.VolumeId) } idxFileName = indexBaseFileName + ".idx" @@ -115,21 +165,25 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }() if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16 - return nil, err + return err } // mount the volume err = vs.store.MountVolume(needle.VolumeId(req.VolumeId)) if err != nil { - return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err) + return fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err) } - return &volume_server_pb.VolumeCopyResponse{ + if err = stream.Send(&volume_server_pb.VolumeCopyResponse{ LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second), - }, err + }); err != nil { + glog.Errorf("send response: %v", err) + } + + return err } -func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error { +func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: vid, @@ -141,15 +195,15 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i IgnoreSourceFileNotFound: ignoreSourceFileNotFound, }) if err != nil { - return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) + return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) } - err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) + modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn) if err != nil { - return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) + return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) } - return nil + return modifiedTsNs, nil } @@ -178,7 +232,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse return nil } -func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error { +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) { glog.V(4).Infof("writing to %s", fileName) flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC if isAppend { @@ -186,22 +240,32 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s } dst, err := os.OpenFile(fileName, flags, 0644) if err != nil { - return nil + return modifiedTsNs, nil } defer dst.Close() + var progressedBytes int64 for { resp, receiveErr := client.Recv() if receiveErr == io.EOF { break } + if resp != nil && resp.ModifiedTsNs != 0 { + modifiedTsNs = resp.ModifiedTsNs + } if receiveErr != nil { - return fmt.Errorf("receiving %s: %v", fileName, receiveErr) + return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr) } dst.Write(resp.FileContent) + progressedBytes += int64(len(resp.FileContent)) + if progressFn != nil { + if !progressFn(progressedBytes) { + return modifiedTsNs, fmt.Errorf("interrupted copy operation") + } + } wt.MaybeSlowdown(int64(len(resp.FileContent))) } - return nil + return modifiedTsNs, nil } func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { @@ -239,6 +303,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 { return fmt.Errorf("volume %d is compacted", req.VolumeId) } + v.SyncToDisk() fileName = v.FileName(req.Ext) } else { baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext @@ -271,6 +336,12 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v } defer file.Close() + fileInfo, err := file.Stat() + if err != nil { + return err + } + fileModTsNs := fileInfo.ModTime().UnixNano() + buffer := make([]byte, BufferSizeLimit) for bytesToRead > 0 { @@ -290,12 +361,14 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v bytesread = int(bytesToRead) } err = stream.Send(&volume_server_pb.CopyFileResponse{ - FileContent: buffer[:bytesread], + FileContent: buffer[:bytesread], + ModifiedTsNs: fileModTsNs, }) if err != nil { // println("sending", bytesread, "bytes err", err.Error()) return err } + fileModTsNs = 0 // only send once bytesToRead -= int64(bytesread) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index d7e4f302a..79611f499 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -3,9 +3,7 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/volume_info" "io" - "io/ioutil" "math" "os" "path" @@ -13,11 +11,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/storage/volume_info" "github.com/chrislusf/seaweedfs/weed/util" ) @@ -49,6 +49,17 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } + shouldCleanup := true + defer func() { + if !shouldCleanup { + return + } + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i)) + } + os.Remove(v.IndexFileName() + ".ecx") + }() + // write .ec00 ~ .ec13 files if err := erasure_coding.WriteEcFiles(baseFileName); err != nil { return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) @@ -64,6 +75,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) } + shouldCleanup = false + return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil } @@ -113,11 +126,11 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId)) indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId)) - err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { + err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error { // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { + if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil); err != nil { return err } } @@ -125,7 +138,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcxFile { // copy ecx file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil { + if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil { return err } return nil @@ -133,14 +146,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcjFile { // copy ecj file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil { + if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil); err != nil { return err } } if req.CopyVifFile { // copy vif file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil { + if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil); err != nil { return err } } @@ -186,12 +199,12 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se existingShardCount := 0 for _, location := range vs.store.Locations { - fileInfos, err := ioutil.ReadDir(location.Directory) + fileInfos, err := os.ReadDir(location.Directory) if err != nil { continue } if location.IdxDirectory != location.Directory { - idxFileInfos, err := ioutil.ReadDir(location.IdxDirectory) + idxFileInfos, err := os.ReadDir(location.IdxDirectory) if err != nil { continue } diff --git a/weed/server/volume_grpc_read_all.go b/weed/server/volume_grpc_read_all.go new file mode 100644 index 000000000..7fe5bad03 --- /dev/null +++ b/weed/server/volume_grpc_read_all.go @@ -0,0 +1,36 @@ +package weed_server + +import ( + "fmt" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage" + "github.com/chrislusf/seaweedfs/weed/storage/needle" +) + +func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesRequest, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer) (err error) { + + for _, vid := range req.VolumeIds { + if err := vs.streaReadOneVolume(needle.VolumeId(vid), stream, err); err != nil { + return err + } + } + return nil +} + +func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer, err error) error { + v := vs.store.GetVolume(vid) + if v == nil { + return fmt.Errorf("not found volume id %d", vid) + } + + scanner := &storage.VolumeFileScanner4ReadAll{ + Stream: stream, + V: v, + } + + offset := int64(v.SuperBlock.BlockSize()) + + err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, offset, scanner) + + return err +} diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 0615a96a1..9114db329 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -3,10 +3,14 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" + "sync" + "time" ) func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) { @@ -30,16 +34,50 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr) } - n := new(needle.Needle) - n.Id = types.NeedleId(req.NeedleId) - n.Cookie = types.Cookie(req.Cookie) - n.Data, n.DataSize = data, uint32(len(data)) - // copied from *Needle.prepareWriteBuffer() - n.Size = 4 + types.Size(n.DataSize) + 1 - n.Checksum = needle.NewCRC(n.Data) - if _, err = vs.store.WriteVolumeNeedle(v.Id, n, true, false); err != nil { - return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + n := new(needle.Needle) + n.Id = types.NeedleId(req.NeedleId) + n.Cookie = types.Cookie(req.Cookie) + n.Data, n.DataSize = data, uint32(len(data)) + // copied from *Needle.prepareWriteBuffer() + n.Size = 4 + types.Size(n.DataSize) + 1 + n.Checksum = needle.NewCRC(n.Data) + n.LastModified = uint64(time.Now().Unix()) + n.SetHasLastModifiedDate() + if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil { + if err == nil { + err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, err) + } + } + }() + if len(req.Replicas) > 0 { + fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie) + for _, replica := range req.Replicas { + wg.Add(1) + go func(targetVolumeServer string) { + defer wg.Done() + uploadOption := &operation.UploadOption{ + UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()), + Filename: "", + Cipher: false, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: security.EncodedJwt(req.Auth), + } + if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil { + if err == nil { + err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err) + } + } + }(replica.Url) + } } - return resp, nil + wg.Wait() + + return resp, err } diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go index 3ea902ed3..4022da44a 100644 --- a/weed/server/volume_grpc_tail.go +++ b/weed/server/volume_grpc_tail.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/pb" "time" "github.com/chrislusf/seaweedfs/weed/glog" @@ -89,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv defer glog.V(1).Infof("receive tailing volume %d finished", v.Id) - return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { + return resp, operation.TailVolumeFromSource(pb.ServerAddress(req.SourceVolumeServer), vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error { _, err := vs.store.WriteVolumeNeedle(v.Id, n, false, false) return err }) diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go index e51de5f1d..c690de959 100644 --- a/weed/server/volume_grpc_tier_upload.go +++ b/weed/server/volume_grpc_tier_upload.go @@ -27,7 +27,7 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi // locate the disk file diskFile, ok := v.DataBackend.(*backend.DiskFile) if !ok { - return fmt.Errorf("volume %d is not on local disk", req.VolumeId) + return nil // already copied to remove. fmt.Errorf("volume %d is not on local disk", req.VolumeId) } // check valid storage backend type @@ -62,13 +62,8 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi }) } - // remember the file original source - attributes := make(map[string]string) - attributes["volumeId"] = v.Id.String() - attributes["collection"] = v.Collection - attributes["ext"] = ".dat" // copy the data file - key, size, err := backendStorage.CopyFile(diskFile.File, attributes, fn) + key, size, err := backendStorage.CopyFile(diskFile.File, fn) if err != nil { return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err) } diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go index f8d1b7fda..0ab782b02 100644 --- a/weed/server/volume_grpc_vacuum.go +++ b/weed/server/volume_grpc_vacuum.go @@ -24,19 +24,35 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve } -func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_server_pb.VacuumVolumeCompactRequest) (*volume_server_pb.VacuumVolumeCompactResponse, error) { +func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error { resp := &volume_server_pb.VacuumVolumeCompactResponse{} - - err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond) + reportInterval := int64(1024 * 1024 * 128) + nextReportTarget := reportInterval + + var sendErr error + err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool { + if processed > nextReportTarget { + resp.ProcessedBytes = processed + if sendErr = stream.Send(resp); sendErr != nil { + return false + } + nextReportTarget = processed + reportInterval + } + return true + }) if err != nil { glog.Errorf("compact volume %d: %v", req.VolumeId, err) - } else { - glog.V(1).Infof("compact volume %d", req.VolumeId) + return err + } + if sendErr != nil { + glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr) + return sendErr } - return resp, err + glog.V(1).Infof("compact volume %d", req.VolumeId) + return nil } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 034521b4b..abb30229a 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,10 +1,13 @@ package weed_server import ( - "fmt" - "github.com/chrislusf/seaweedfs/weed/storage/types" "net/http" "sync" + "time" + + "github.com/chrislusf/seaweedfs/weed/pb" + "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" + "github.com/chrislusf/seaweedfs/weed/storage/types" "google.golang.org/grpc" @@ -17,15 +20,17 @@ import ( ) type VolumeServer struct { + volume_server_pb.UnimplementedVolumeServerServer inFlightUploadDataSize int64 inFlightDownloadDataSize int64 concurrentUploadLimit int64 concurrentDownloadLimit int64 inFlightUploadDataLimitCond *sync.Cond inFlightDownloadDataLimitCond *sync.Cond + inflightUploadDataTimeout time.Duration - SeedMasterNodes []string - currentMaster string + SeedMasterNodes []pb.ServerAddress + currentMaster pb.ServerAddress pulseSeconds int dataCenter string rack string @@ -45,11 +50,11 @@ type VolumeServer struct { } func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, - port int, publicUrl string, + port int, grpcPort int, publicUrl string, folders []string, maxCounts []int, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType, idxFolder string, needleMapKind storage.NeedleMapKind, - masterNodes []string, pulseSeconds int, + masterNodes []pb.ServerAddress, pulseSeconds int, dataCenter string, rack string, whiteList []string, fixJpgOrientation bool, @@ -58,6 +63,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, fileSizeLimitMB int, concurrentUploadLimit int64, concurrentDownloadLimit int64, + inflightUploadDataTimeout time.Duration, ) *VolumeServer { v := util.GetViper() @@ -86,16 +92,18 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)), concurrentUploadLimit: concurrentUploadLimit, concurrentDownloadLimit: concurrentDownloadLimit, + inflightUploadDataTimeout: inflightUploadDataTimeout, } vs.SeedMasterNodes = masterNodes vs.checkWithMaster() - vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes) + vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes) vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec) handleStaticResources(adminMux) adminMux.HandleFunc("/status", vs.statusHandler) + adminMux.HandleFunc("/healthz", vs.healthzHandler) if signingKey == "" || enableUiAccess { // only expose the volume server details for safe environments adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler) @@ -113,7 +121,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, } go vs.heartbeat() - go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec) + go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec) return vs } diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go index ff2eccc11..293f36f14 100644 --- a/weed/server/volume_server_handlers.go +++ b/weed/server/volume_server_handlers.go @@ -1,10 +1,12 @@ package weed_server import ( + "fmt" "net/http" "strconv" "strings" "sync/atomic" + "time" "github.com/chrislusf/seaweedfs/weed/util" @@ -39,8 +41,14 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque stats.ReadRequest() vs.inFlightDownloadDataLimitCond.L.Lock() for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit { - glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit) - vs.inFlightDownloadDataLimitCond.Wait() + select { + case <-r.Context().Done(): + glog.V(4).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err()) + return + default: + glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit) + vs.inFlightDownloadDataLimitCond.Wait() + } } vs.inFlightDownloadDataLimitCond.L.Unlock() vs.GetOrHeadHandler(w, r) @@ -49,18 +57,31 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque vs.guard.WhiteList(vs.DeleteHandler)(w, r) case "PUT", "POST": - // wait until in flight data is less than the limit contentLength := getContentLength(r) - vs.inFlightUploadDataLimitCond.L.Lock() - for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit { - glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) - vs.inFlightUploadDataLimitCond.Wait() + // exclude the replication from the concurrentUploadLimitMB + if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 { + startTime := time.Now() + vs.inFlightUploadDataLimitCond.L.Lock() + for vs.inFlightUploadDataSize > vs.concurrentUploadLimit { + //wait timeout check + if startTime.Add(vs.inflightUploadDataTimeout).Before(time.Now()) { + vs.inFlightUploadDataLimitCond.L.Unlock() + err := fmt.Errorf("reject because inflight upload data %d > %d, and wait timeout", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) + glog.V(1).Infof("too many requests: %v", err) + writeJsonError(w, r, http.StatusTooManyRequests, err) + return + } + glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit) + vs.inFlightUploadDataLimitCond.Wait() + } + vs.inFlightUploadDataLimitCond.L.Unlock() } - vs.inFlightUploadDataLimitCond.L.Unlock() atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength) defer func() { atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength) - vs.inFlightUploadDataLimitCond.Signal() + if vs.concurrentUploadLimit != 0 { + vs.inFlightUploadDataLimitCond.Signal() + } }() // processs uploads @@ -133,7 +154,7 @@ func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid str return false } - token, err := security.DecodeJwt(signingKey, tokenStr) + token, err := security.DecodeJwt(signingKey, tokenStr, &security.SeaweedFileIdClaims{}) if err != nil { glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err) return false diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go index 7e6c06871..37cf109e2 100644 --- a/weed/server/volume_server_handlers_admin.go +++ b/weed/server/volume_server_handlers_admin.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/topology" "net/http" "path/filepath" @@ -9,6 +10,24 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +func (vs *VolumeServer) healthzHandler(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) + volumeInfos := vs.store.VolumeInfos() + for _, vinfo := range volumeInfos { + if len(vinfo.Collection) == 0 { + continue + } + if vinfo.ReplicaPlacement.GetCopyCount() > 1 { + _, err := topology.GetWritableRemoteReplications(vs.store, vs.grpcDialOption, vinfo.Id, vs.GetMaster) + if err != nil { + w.WriteHeader(http.StatusServiceUnavailable) + return + } + } + } + w.WriteHeader(http.StatusOK) +} + func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) { w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION) m := make(map[string]interface{}) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 5d12108d3..eb5b2be5a 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -6,6 +6,7 @@ import ( "errors" "fmt" "github.com/chrislusf/seaweedfs/weed/storage/types" + "github.com/chrislusf/seaweedfs/weed/util/mem" "io" "mime" "net/http" @@ -29,8 +30,6 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range")) - stats.VolumeServerRequestCounter.WithLabelValues("get").Inc() start := time.Now() defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }() @@ -103,7 +102,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } w.WriteHeader(response.StatusCode) - io.Copy(w, response.Body) + buf := mem.Allocate(128 * 1024) + defer mem.Free(buf) + io.CopyBuffer(w, response.Body, buf) return } else { // redirect @@ -126,6 +127,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) var count int var needleSize types.Size + readOption.AttemptMetaOnly, readOption.MustMetaOnly = shouldAttemptStreamWrite(hasVolume, ext, r) onReadSizeFn := func(size types.Size) { needleSize = size atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize)) @@ -140,7 +142,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) vs.inFlightDownloadDataLimitCond.Signal() }() - if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume { + if err != nil && err != storage.ErrorDeleted && hasVolume { glog.V(4).Infof("read needle: %v", err) // start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request } @@ -217,11 +219,31 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) } } - rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r) + if !readOption.IsMetaOnly { + rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r) + if e := writeResponseContent(filename, mtype, rs, w, r); e != nil { + glog.V(2).Infoln("response write error:", e) + } + } else { + vs.streamWriteResponseContent(filename, mtype, volumeId, n, w, r, readOption) + } +} - if e := writeResponseContent(filename, mtype, rs, w, r); e != nil { - glog.V(2).Infoln("response write error:", e) +func shouldAttemptStreamWrite(hasLocalVolume bool, ext string, r *http.Request) (shouldAttempt bool, mustMetaOnly bool) { + if !hasLocalVolume { + return false, false + } + if len(ext) > 0 { + ext = strings.ToLower(ext) } + if r.Method == "HEAD" { + return true, true + } + _, _, _, shouldResize := shouldResizeImages(ext, r) + if shouldResize { + return false, false + } + return true, false } func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, ext string, w http.ResponseWriter, r *http.Request) (processed bool) { @@ -301,7 +323,7 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re } w.Header().Set("Accept-Ranges", "bytes") - adjustHeaderContentDisposition(w, r, filename) + adjustPassthroughHeaders(w, r, filename) if r.Method == "HEAD" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) @@ -317,3 +339,27 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re }) return nil } + +func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType string, volumeId needle.VolumeId, n *needle.Needle, w http.ResponseWriter, r *http.Request, readOption *storage.ReadOption) { + totalSize := int64(n.DataSize) + if mimeType == "" { + if ext := filepath.Ext(filename); ext != "" { + mimeType = mime.TypeByExtension(ext) + } + } + if mimeType != "" { + w.Header().Set("Content-Type", mimeType) + } + w.Header().Set("Accept-Ranges", "bytes") + adjustPassthroughHeaders(w, r, filename) + + if r.Method == "HEAD" { + w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) + return + } + + processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error { + return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size) + }) + +} diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go index 437e5c45d..2c420c2d6 100644 --- a/weed/server/volume_server_handlers_ui.go +++ b/weed/server/volume_server_handlers_ui.go @@ -1,6 +1,7 @@ package weed_server import ( + "github.com/chrislusf/seaweedfs/weed/pb" "net/http" "path/filepath" "time" @@ -35,7 +36,7 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) } args := struct { Version string - Masters []string + Masters []pb.ServerAddress Volumes interface{} EcVolumes interface{} RemoteVolumes interface{} diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go index a009611da..24ad916e6 100644 --- a/weed/server/volume_server_tcp_handlers_write.go +++ b/weed/server/volume_server_tcp_handlers_write.go @@ -3,12 +3,13 @@ package weed_server import ( "bufio" "fmt" - "github.com/chrislusf/seaweedfs/weed/glog" - "github.com/chrislusf/seaweedfs/weed/storage/needle" - "github.com/chrislusf/seaweedfs/weed/util" "io" "net" "strings" + + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/storage/needle" + "github.com/chrislusf/seaweedfs/weed/util" ) func (vs *VolumeServer) HandleTcpConnection(c net.Conn) { diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 68c1f3233..265dea03a 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -5,7 +5,6 @@ import ( "context" "fmt" "io" - "math" "os" "path" "strings" @@ -27,19 +26,18 @@ import ( ) type WebDavOption struct { - Filer string - FilerGrpcAddress string - DomainName string - BucketsPath string - GrpcDialOption grpc.DialOption - Collection string - Replication string - DiskType string - Uid uint32 - Gid uint32 - Cipher bool - CacheDir string - CacheSizeMB int64 + Filer pb.ServerAddress + DomainName string + BucketsPath string + GrpcDialOption grpc.DialOption + Collection string + Replication string + DiskType string + Uid uint32 + Gid uint32 + Cipher bool + CacheDir string + CacheSizeMB int64 } type WebDavServer struct { @@ -107,7 +105,7 @@ type WebDavFile struct { func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { - cacheUniqueId := util.Md5String([]byte("webdav" + option.FilerGrpcAddress + util.Version()))[0:8] + cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8] cacheDir := path.Join(option.CacheDir, cacheUniqueId) os.MkdirAll(cacheDir, os.FileMode(0755)) @@ -121,12 +119,12 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) { var _ = filer_pb.FilerClient(&WebDavFileSystem{}) -func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error { +func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { - return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { + return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption) + }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption) } func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { @@ -163,7 +161,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm return os.ErrExist } - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { dir, name := util.FullPath(fullDirPath).DirAndName() request := &filer_pb.CreateEntryRequest{ Directory: dir, @@ -213,21 +211,19 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f } dir, name := util.FullPath(fullFilePath).DirAndName() - err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: &filer_pb.Entry{ Name: name, IsDirectory: perm&os.ModeDir > 0, Attributes: &filer_pb.FuseAttributes{ - Mtime: time.Now().Unix(), - Crtime: time.Now().Unix(), - FileMode: uint32(perm), - Uid: fs.option.Uid, - Gid: fs.option.Gid, - Collection: fs.option.Collection, - Replication: fs.option.Replication, - TtlSec: 0, + Mtime: time.Now().Unix(), + Crtime: time.Now().Unix(), + FileMode: uint32(perm), + Uid: fs.option.Uid, + Gid: fs.option.Gid, + TtlSec: 0, }, }, Signatures: []int32{fs.signature}, @@ -316,7 +312,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string) oldDir, oldBaseName := util.FullPath(oldName).DirAndName() newDir, newBaseName := util.FullPath(newName).DirAndName() - return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { request := &filer_pb.AtomicRenameEntryRequest{ OldDirectory: oldDir, @@ -376,7 +372,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 var fileId, host string var auth security.EncodedJwt - if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { ctx := context.Background() @@ -398,7 +394,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 return fmt.Errorf("assign volume failure %v: %v", request, resp.Error) } - fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) + fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth) f.collection, f.replication = resp.Collection, resp.Replication return nil @@ -413,7 +409,16 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: fileUrl, + Filename: f.name, + Cipher: f.fs.option.Cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: auth, + } + uploadResult, flushErr, _ := operation.Upload(reader, uploadOption) if flushErr != nil { glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr) return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr) @@ -469,10 +474,8 @@ func (f *WebDavFile) Write(buf []byte) (int, error) { f.entry.Chunks = manifestedChunks } - flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error { + flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { f.entry.Attributes.Mtime = time.Now().Unix() - f.entry.Attributes.Collection = f.collection - f.entry.Attributes.Replication = f.replication request := &filer_pb.UpdateEntryRequest{ Directory: dir, @@ -532,11 +535,11 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) { return 0, io.EOF } if f.entryViewCache == nil { - f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64) + f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize) f.reader = nil } if f.reader == nil { - chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64) + chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize) f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize) } |
