aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authoryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
committeryulai.li <blacktear23@gmail.com>2022-06-26 22:43:37 +0800
commit46e0b629e529f3aff535f90dd25eb719adf1c0d0 (patch)
tree734125b48b6d96f8796a2b89b924312cd169ef0e /weed/server
parenta5bd0b3a1644a77dcc0b9ff41c4ce8eb3ea0d566 (diff)
parentdc59ccd110a321db7d0b0480631aa95a3d9ba7e6 (diff)
downloadseaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.tar.xz
seaweedfs-46e0b629e529f3aff535f90dd25eb719adf1c0d0.zip
Update tikv client version and add one PC support
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go38
-rw-r--r--weed/server/filer_grpc_server.go161
-rw-r--r--weed/server/filer_grpc_server_admin.go178
-rw-r--r--weed/server/filer_grpc_server_remote.go40
-rw-r--r--weed/server/filer_grpc_server_rename.go120
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go88
-rw-r--r--weed/server/filer_server.go65
-rw-r--r--weed/server/filer_server_handlers.go78
-rw-r--r--weed/server/filer_server_handlers_proxy.go6
-rw-r--r--weed/server/filer_server_handlers_read.go137
-rw-r--r--weed/server/filer_server_handlers_read_dir.go24
-rw-r--r--weed/server/filer_server_handlers_tagging.go8
-rw-r--r--weed/server/filer_server_handlers_write.go107
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go102
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go32
-rw-r--r--weed/server/filer_server_handlers_write_merge.go11
-rw-r--r--weed/server/filer_server_handlers_write_upload.go46
-rw-r--r--weed/server/filer_server_rocksdb.go1
-rw-r--r--weed/server/filer_ui/breadcrumb.go6
-rw-r--r--weed/server/filer_ui/filer.html280
-rw-r--r--weed/server/master_grpc_server.go170
-rw-r--r--weed/server/master_grpc_server_admin.go55
-rw-r--r--weed/server/master_grpc_server_cluster.go41
-rw-r--r--weed/server/master_grpc_server_collection.go4
-rw-r--r--weed/server/master_grpc_server_raft.go79
-rw-r--r--weed/server/master_grpc_server_volume.go47
-rw-r--r--weed/server/master_server.go218
-rw-r--r--weed/server/master_server_handlers.go12
-rw-r--r--weed/server/master_server_handlers_admin.go27
-rw-r--r--weed/server/master_server_handlers_ui.go52
-rw-r--r--weed/server/master_ui/masterNewRaft.html121
-rw-r--r--weed/server/master_ui/templates.go4
-rw-r--r--weed/server/raft_hashicorp.go186
-rw-r--r--weed/server/raft_server.go136
-rw-r--r--weed/server/raft_server_handlers.go17
-rw-r--r--weed/server/volume_grpc_admin.go53
-rw-r--r--weed/server/volume_grpc_client_to_master.go76
-rw-r--r--weed/server/volume_grpc_copy.go123
-rw-r--r--weed/server/volume_grpc_erasure_coding.go31
-rw-r--r--weed/server/volume_grpc_read_all.go36
-rw-r--r--weed/server/volume_grpc_remote.go58
-rw-r--r--weed/server/volume_grpc_tail.go3
-rw-r--r--weed/server/volume_grpc_tier_upload.go9
-rw-r--r--weed/server/volume_grpc_vacuum.go28
-rw-r--r--weed/server/volume_server.go24
-rw-r--r--weed/server/volume_server_handlers.go41
-rw-r--r--weed/server/volume_server_handlers_admin.go19
-rw-r--r--weed/server/volume_server_handlers_read.go62
-rw-r--r--weed/server/volume_server_handlers_ui.go3
-rw-r--r--weed/server/volume_server_tcp_handlers_write.go7
-rw-r--r--weed/server/webdav_server.go77
51 files changed, 2563 insertions, 784 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index ad3842190..f02ec67ac 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -1,14 +1,17 @@
package weed_server
import (
+ "bufio"
"bytes"
"encoding/json"
"errors"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"io"
"io/fs"
"mime/multipart"
"net/http"
+ "net/url"
"path/filepath"
"strconv"
"strings"
@@ -161,7 +164,16 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
}
debug("upload file to store", url)
- uploadResult, err := operation.UploadData(url, pu.FileName, false, pu.Data, pu.IsGzipped, pu.MimeType, pu.PairMap, assignResult.Auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: url,
+ Filename: pu.FileName,
+ Cipher: false,
+ IsInputCompressed: pu.IsGzipped,
+ MimeType: pu.MimeType,
+ PairMap: pu.PairMap,
+ Jwt: assignResult.Auth,
+ }
+ uploadResult, err := operation.UploadData(pu.Data, uploadOption)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
@@ -240,8 +252,20 @@ func handleStaticResources2(r *mux.Router) {
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(http.FS(StaticFS))))
}
+func adjustPassthroughHeaders(w http.ResponseWriter, r *http.Request, filename string) {
+ for header, values := range r.Header {
+ if normalizedHeader, ok := s3_constants.PassThroughHeaders[strings.ToLower(header)]; ok {
+ w.Header()[normalizedHeader] = values
+ }
+ }
+ adjustHeaderContentDisposition(w, r, filename)
+}
func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) {
+ if contentDisposition := w.Header().Get("Content-Disposition"); contentDisposition != "" {
+ return
+ }
if filename != "" {
+ filename = url.QueryEscape(filename)
contentDisposition := "inline"
if r.FormValue("dl") != "" {
if dl, _ := strconv.ParseBool(r.FormValue("dl")); dl {
@@ -254,10 +278,13 @@ func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, file
func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64, mimeType string, writeFn func(writer io.Writer, offset int64, size int64) error) {
rangeReq := r.Header.Get("Range")
+ bufferedWriter := bufio.NewWriterSize(w, 128*1024)
+ defer bufferedWriter.Flush()
if rangeReq == "" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
- if err := writeFn(w, 0, totalSize); err != nil {
+ if err := writeFn(bufferedWriter, 0, totalSize); err != nil {
+ glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -268,6 +295,7 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
//mostly copy from src/pkg/net/http/fs.go
ranges, err := parseRange(rangeReq, totalSize)
if err != nil {
+ glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusRequestedRangeNotSatisfiable)
return
}
@@ -298,8 +326,9 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Range", ra.contentRange(totalSize))
w.WriteHeader(http.StatusPartialContent)
- err = writeFn(w, ra.start, ra.length)
+ err = writeFn(bufferedWriter, ra.start, ra.length)
if err != nil {
+ glog.Errorf("processRangeRequest headers: %+v err: %v", w.Header(), err)
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
@@ -338,7 +367,8 @@ func processRangeRequest(r *http.Request, w http.ResponseWriter, totalSize int64
w.Header().Set("Content-Length", strconv.FormatInt(sendSize, 10))
}
w.WriteHeader(http.StatusPartialContent)
- if _, err := io.CopyN(w, sendContent, sendSize); err != nil {
+ if _, err := io.CopyN(bufferedWriter, sendContent, sendSize); err != nil {
+ glog.Errorf("processRangeRequest err: %v", err)
http.Error(w, "Internal Error", http.StatusInternalServerError)
return
}
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index e025e73dc..17d17c588 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -107,6 +107,7 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
locs = append(locs, &filer_pb.Location{
Url: loc.Url,
PublicUrl: loc.PublicUrl,
+ GrpcPort: uint32(loc.GrpcPort),
})
}
resp.LocationsMap[vidString] = &filer_pb.Locations{
@@ -143,10 +144,15 @@ func (fs *FilerServer) CreateEntry(ctx context.Context, req *filer_pb.CreateEntr
return &filer_pb.CreateEntryResponse{}, fmt.Errorf("CreateEntry cleanupChunks %s %s: %v", req.Directory, req.Entry.Name, err2)
}
+ so, err := fs.detectStorageOption(string(util.NewFullPath(req.Directory, req.Entry.Name)), "", "", 0, "", "", "", "")
+ if err != nil {
+ return nil, err
+ }
newEntry := filer.FromPbEntry(req.Directory, req.Entry)
newEntry.Chunks = chunks
+ newEntry.TtlSec = so.TtlSeconds
- createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures)
+ createErr := fs.filer.CreateEntry(ctx, newEntry, req.OExcl, req.IsFromOtherCluster, req.Signatures, req.SkipCheckParentDirectory)
if createErr == nil {
fs.filer.DeleteChunks(garbage)
@@ -210,10 +216,11 @@ func (fs *FilerServer) cleanupChunks(fullpath string, existingEntry *filer.Entry
if newEntry.Attributes != nil {
so, _ := fs.detectStorageOption(fullpath,
- newEntry.Attributes.Collection,
- newEntry.Attributes.Replication,
+ "",
+ "",
newEntry.Attributes.TtlSec,
- newEntry.Attributes.DiskType,
+ "",
+ "",
"",
"",
) // ignore readonly error for capacity needed to manifestize
@@ -257,7 +264,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
}
entry.Chunks = append(entry.Chunks, req.Chunks...)
- so, err := fs.detectStorageOption(string(fullpath), entry.Collection, entry.Replication, entry.TtlSec, entry.DiskType, "", "")
+ so, err := fs.detectStorageOption(string(fullpath), "", "", entry.TtlSec, "", "", "", "")
if err != nil {
glog.Warningf("detectStorageOption: %v", err)
return &filer_pb.AppendToEntryResponse{}, err
@@ -268,7 +275,7 @@ func (fs *FilerServer) AppendToEntry(ctx context.Context, req *filer_pb.AppendTo
glog.V(0).Infof("MaybeManifestize: %v", err)
}
- err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil)
+ err = fs.filer.CreateEntry(context.Background(), entry, false, false, nil, false)
return &filer_pb.AppendToEntryResponse{}, err
}
@@ -287,7 +294,7 @@ func (fs *FilerServer) DeleteEntry(ctx context.Context, req *filer_pb.DeleteEntr
func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVolumeRequest) (resp *filer_pb.AssignVolumeResponse, err error) {
- so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack)
+ so, err := fs.detectStorageOption(req.Path, req.Collection, req.Replication, req.TtlSec, req.DiskType, req.DataCenter, req.Rack, req.DataNode)
if err != nil {
glog.V(3).Infof("AssignVolume: %v", err)
return &filer_pb.AssignVolumeResponse{Error: fmt.Sprintf("assign volume: %v", err)}, nil
@@ -306,10 +313,13 @@ func (fs *FilerServer) AssignVolume(ctx context.Context, req *filer_pb.AssignVol
}
return &filer_pb.AssignVolumeResponse{
- FileId: assignResult.Fid,
- Count: int32(assignResult.Count),
- Url: assignResult.Url,
- PublicUrl: assignResult.PublicUrl,
+ FileId: assignResult.Fid,
+ Count: int32(assignResult.Count),
+ Location: &filer_pb.Location{
+ Url: assignResult.Url,
+ PublicUrl: assignResult.PublicUrl,
+ GrpcPort: uint32(assignResult.GrpcPort),
+ },
Auth: string(assignResult.Auth),
Collection: so.Collection,
Replication: so.Replication,
@@ -321,7 +331,7 @@ func (fs *FilerServer) CollectionList(ctx context.Context, req *filer_pb.Collect
glog.V(4).Infof("CollectionList %v", req)
resp = &filer_pb.CollectionListResponse{}
- err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
masterResp, err := client.CollectionList(context.Background(), &master_pb.CollectionListRequest{
IncludeNormalVolumes: req.IncludeNormalVolumes,
IncludeEcVolumes: req.IncludeEcVolumes,
@@ -342,7 +352,7 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet
glog.V(4).Infof("DeleteCollection %v", req)
- err = fs.filer.MasterClient.WithClient(func(client master_pb.SeaweedClient) error {
+ err = fs.filer.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
_, err := client.CollectionDelete(context.Background(), &master_pb.CollectionDeleteRequest{
Name: req.GetCollection(),
})
@@ -351,128 +361,3 @@ func (fs *FilerServer) DeleteCollection(ctx context.Context, req *filer_pb.Delet
return &filer_pb.DeleteCollectionResponse{}, err
}
-
-func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
-
- var output *master_pb.StatisticsResponse
-
- err = fs.filer.MasterClient.WithClient(func(masterClient master_pb.SeaweedClient) error {
- grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
- Replication: req.Replication,
- Collection: req.Collection,
- Ttl: req.Ttl,
- DiskType: req.DiskType,
- })
- if grpcErr != nil {
- return grpcErr
- }
-
- output = grpcResponse
- return nil
- })
-
- if err != nil {
- return nil, err
- }
-
- return &filer_pb.StatisticsResponse{
- TotalSize: output.TotalSize,
- UsedSize: output.UsedSize,
- FileCount: output.FileCount,
- }, nil
-}
-
-func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
-
- clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
-
- t := &filer_pb.GetFilerConfigurationResponse{
- Masters: fs.option.Masters,
- Collection: fs.option.Collection,
- Replication: fs.option.DefaultReplication,
- MaxMb: uint32(fs.option.MaxMB),
- DirBuckets: fs.filer.DirBucketsPath,
- Cipher: fs.filer.Cipher,
- Signature: fs.filer.Signature,
- MetricsAddress: fs.metricsAddress,
- MetricsIntervalSec: int32(fs.metricsIntervalSec),
- Version: util.Version(),
- ClusterId: string(clusterId),
- }
-
- glog.V(4).Infof("GetFilerConfiguration: %v", t)
-
- return t, nil
-}
-
-func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
-
- req, err := stream.Recv()
- if err != nil {
- return err
- }
-
- clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort)
- m := make(map[string]bool)
- for _, tp := range req.Resources {
- m[tp] = true
- }
- fs.brokersLock.Lock()
- fs.brokers[clientName] = m
- glog.V(0).Infof("+ broker %v", clientName)
- fs.brokersLock.Unlock()
-
- defer func() {
- fs.brokersLock.Lock()
- delete(fs.brokers, clientName)
- glog.V(0).Infof("- broker %v: %v", clientName, err)
- fs.brokersLock.Unlock()
- }()
-
- for {
- if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
- glog.V(0).Infof("send broker %v: %+v", clientName, err)
- return err
- }
- // println("replied")
-
- if _, err := stream.Recv(); err != nil {
- glog.V(0).Infof("recv broker %v: %v", clientName, err)
- return err
- }
- // println("received")
- }
-
-}
-
-func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
-
- resp = &filer_pb.LocateBrokerResponse{}
-
- fs.brokersLock.Lock()
- defer fs.brokersLock.Unlock()
-
- var localBrokers []*filer_pb.LocateBrokerResponse_Resource
-
- for b, m := range fs.brokers {
- if _, found := m[req.Resource]; found {
- resp.Found = true
- resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
- {
- GrpcAddresses: b,
- ResourceCount: int32(len(m)),
- },
- }
- return
- }
- localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
- GrpcAddresses: b,
- ResourceCount: int32(len(m)),
- })
- }
-
- resp.Resources = localBrokers
-
- return resp, nil
-
-}
diff --git a/weed/server/filer_grpc_server_admin.go b/weed/server/filer_grpc_server_admin.go
new file mode 100644
index 000000000..df5b8fa1e
--- /dev/null
+++ b/weed/server/filer_grpc_server_admin.go
@@ -0,0 +1,178 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+ "time"
+)
+
+func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsRequest) (resp *filer_pb.StatisticsResponse, err error) {
+
+ var output *master_pb.StatisticsResponse
+
+ err = fs.filer.MasterClient.WithClient(false, func(masterClient master_pb.SeaweedClient) error {
+ grpcResponse, grpcErr := masterClient.Statistics(context.Background(), &master_pb.StatisticsRequest{
+ Replication: req.Replication,
+ Collection: req.Collection,
+ Ttl: req.Ttl,
+ DiskType: req.DiskType,
+ })
+ if grpcErr != nil {
+ return grpcErr
+ }
+
+ output = grpcResponse
+ return nil
+ })
+
+ if err != nil {
+ return nil, err
+ }
+
+ return &filer_pb.StatisticsResponse{
+ TotalSize: output.TotalSize,
+ UsedSize: output.UsedSize,
+ FileCount: output.FileCount,
+ }, nil
+}
+
+func (fs *FilerServer) Ping(ctx context.Context, req *filer_pb.PingRequest) (resp *filer_pb.PingResponse, pingErr error) {
+ resp = &filer_pb.PingResponse{
+ StartTimeNs: time.Now().UnixNano(),
+ }
+ if req.TargetType == cluster.FilerType {
+ pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.VolumeServerType {
+ pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.MasterType {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), fs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if pingErr != nil {
+ pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
+ }
+ resp.StopTimeNs = time.Now().UnixNano()
+ return
+}
+
+func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
+
+ clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
+
+ t := &filer_pb.GetFilerConfigurationResponse{
+ Masters: pb.ToAddressStringsFromMap(fs.option.Masters),
+ Collection: fs.option.Collection,
+ Replication: fs.option.DefaultReplication,
+ MaxMb: uint32(fs.option.MaxMB),
+ DirBuckets: fs.filer.DirBucketsPath,
+ Cipher: fs.filer.Cipher,
+ Signature: fs.filer.Signature,
+ MetricsAddress: fs.metricsAddress,
+ MetricsIntervalSec: int32(fs.metricsIntervalSec),
+ Version: util.Version(),
+ ClusterId: string(clusterId),
+ FilerGroup: fs.option.FilerGroup,
+ }
+
+ glog.V(4).Infof("GetFilerConfiguration: %v", t)
+
+ return t, nil
+}
+
+func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedServer) error {
+
+ req, err := stream.Recv()
+ if err != nil {
+ return err
+ }
+
+ clientName := util.JoinHostPort(req.Name, int(req.GrpcPort))
+ m := make(map[string]bool)
+ for _, tp := range req.Resources {
+ m[tp] = true
+ }
+ fs.brokersLock.Lock()
+ fs.brokers[clientName] = m
+ glog.V(0).Infof("+ broker %v", clientName)
+ fs.brokersLock.Unlock()
+
+ defer func() {
+ fs.brokersLock.Lock()
+ delete(fs.brokers, clientName)
+ glog.V(0).Infof("- broker %v: %v", clientName, err)
+ fs.brokersLock.Unlock()
+ }()
+
+ for {
+ if err := stream.Send(&filer_pb.KeepConnectedResponse{}); err != nil {
+ glog.V(0).Infof("send broker %v: %+v", clientName, err)
+ return err
+ }
+ // println("replied")
+
+ if _, err := stream.Recv(); err != nil {
+ glog.V(0).Infof("recv broker %v: %v", clientName, err)
+ return err
+ }
+ // println("received")
+ }
+
+}
+
+func (fs *FilerServer) LocateBroker(ctx context.Context, req *filer_pb.LocateBrokerRequest) (resp *filer_pb.LocateBrokerResponse, err error) {
+
+ resp = &filer_pb.LocateBrokerResponse{}
+
+ fs.brokersLock.Lock()
+ defer fs.brokersLock.Unlock()
+
+ var localBrokers []*filer_pb.LocateBrokerResponse_Resource
+
+ for b, m := range fs.brokers {
+ if _, found := m[req.Resource]; found {
+ resp.Found = true
+ resp.Resources = []*filer_pb.LocateBrokerResponse_Resource{
+ {
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ },
+ }
+ return
+ }
+ localBrokers = append(localBrokers, &filer_pb.LocateBrokerResponse_Resource{
+ GrpcAddresses: b,
+ ResourceCount: int32(len(m)),
+ })
+ }
+
+ resp.Resources = localBrokers
+
+ return resp, nil
+
+}
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index 8064431c5..3be986023 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -3,20 +3,22 @@ package weed_server
import (
"context"
"fmt"
+ "strings"
+ "sync"
+ "time"
+
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
- "strings"
- "sync"
- "time"
)
-func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.DownloadToLocalRequest) (*filer_pb.DownloadToLocalResponse, error) {
+func (fs *FilerServer) CacheRemoteObjectToLocalCluster(ctx context.Context, req *filer_pb.CacheRemoteObjectToLocalClusterRequest) (*filer_pb.CacheRemoteObjectToLocalClusterResponse, error) {
// load all mappings
mappingEntry, err := fs.filer.FindEntry(ctx, util.JoinPath(filer.DirectoryEtcRemote, filer.REMOTE_STORAGE_MOUNT_FILE))
@@ -56,14 +58,13 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
return nil, err
}
- resp := &filer_pb.DownloadToLocalResponse{}
+ resp := &filer_pb.CacheRemoteObjectToLocalClusterResponse{}
if entry.Remote == nil || entry.Remote.RemoteSize == 0 {
return resp, nil
}
// detect storage option
- // replication level is set to "000" to ensure only need to ask one volume server to fetch the data.
- so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "")
+ so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "", "")
if err != nil {
return resp, err
}
@@ -111,14 +112,26 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
return
}
+ var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica
+ for _, r := range assignResult.Replicas {
+ replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
+ Url: r.Url,
+ PublicUrl: r.PublicUrl,
+ GrpcPort: int32(r.GrpcPort),
+ })
+ }
+
// tell filer to tell volume server to download into needles
- err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ assignedServerAddress := pb.NewServerAddressWithGrpcPort(assignResult.Url, assignResult.GrpcPort)
+ err = operation.WithVolumeServerClient(false, assignedServerAddress, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
- VolumeId: uint32(fileId.VolumeId),
- NeedleId: uint64(fileId.Key),
- Cookie: uint32(fileId.Cookie),
- Offset: localOffset,
- Size: size,
+ VolumeId: uint32(fileId.VolumeId),
+ NeedleId: uint64(fileId.Key),
+ Cookie: uint32(fileId.Cookie),
+ Offset: localOffset,
+ Size: size,
+ Replicas: replicas,
+ Auth: string(assignResult.Auth),
RemoteConf: storageConf,
RemoteLocation: &remote_pb.RemoteStorageLocation{
Name: remoteStorageMountedLocation.Name,
@@ -166,6 +179,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
// this skips meta data log events
if err := fs.filer.Store.UpdateEntry(context.Background(), newEntry); err != nil {
+ fs.filer.DeleteChunks(chunks)
return nil, err
}
fs.filer.DeleteChunks(garbage)
diff --git a/weed/server/filer_grpc_server_rename.go b/weed/server/filer_grpc_server_rename.go
index 8a11c91e3..7d6650b53 100644
--- a/weed/server/filer_grpc_server_rename.go
+++ b/weed/server/filer_grpc_server_rename.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"path/filepath"
+ "time"
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -33,7 +34,7 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return nil, fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
}
- moveErr := fs.moveEntry(ctx, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
+ moveErr := fs.moveEntry(ctx, nil, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
if moveErr != nil {
fs.filer.RollbackTransaction(ctx)
return nil, fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
@@ -47,11 +48,64 @@ func (fs *FilerServer) AtomicRenameEntry(ctx context.Context, req *filer_pb.Atom
return &filer_pb.AtomicRenameEntryResponse{}, nil
}
-func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
+func (fs *FilerServer) StreamRenameEntry(req *filer_pb.StreamRenameEntryRequest, stream filer_pb.SeaweedFiler_StreamRenameEntryServer) (err error) {
- if err := fs.moveSelfEntry(ctx, oldParent, entry, newParent, newName, func() error {
+ glog.V(1).Infof("StreamRenameEntry %v", req)
+
+ oldParent := util.FullPath(filepath.ToSlash(req.OldDirectory))
+ newParent := util.FullPath(filepath.ToSlash(req.NewDirectory))
+
+ if err := fs.filer.CanRename(oldParent, newParent); err != nil {
+ return err
+ }
+
+ ctx := context.Background()
+
+ ctx, err = fs.filer.BeginTransaction(ctx)
+ if err != nil {
+ return err
+ }
+
+ oldEntry, err := fs.filer.FindEntry(ctx, oldParent.Child(req.OldName))
+ if err != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return fmt.Errorf("%s/%s not found: %v", req.OldDirectory, req.OldName, err)
+ }
+
+ if oldEntry.IsDirectory() {
+ // follow https://pubs.opengroup.org/onlinepubs/000095399/functions/rename.html
+ targetDir := newParent.Child(req.NewName)
+ newEntry, err := fs.filer.FindEntry(ctx, targetDir)
+ if err == nil {
+ if !newEntry.IsDirectory() {
+ fs.filer.RollbackTransaction(ctx)
+ return fmt.Errorf("%s is not directory", targetDir)
+ }
+ if entries, _, _ := fs.filer.ListDirectoryEntries(context.Background(), targetDir, "", false, 1, "", "", ""); len(entries) > 0 {
+ return fmt.Errorf("%s is not empty", targetDir)
+ }
+ }
+ }
+
+ moveErr := fs.moveEntry(ctx, stream, oldParent, oldEntry, newParent, req.NewName, req.Signatures)
+ if moveErr != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return fmt.Errorf("%s/%s move error: %v", req.OldDirectory, req.OldName, moveErr)
+ } else {
+ if commitError := fs.filer.CommitTransaction(ctx); commitError != nil {
+ fs.filer.RollbackTransaction(ctx)
+ return fmt.Errorf("%s/%s move commit error: %v", req.OldDirectory, req.OldName, commitError)
+ }
+ }
+
+ return nil
+}
+
+func (fs *FilerServer) moveEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
+
+ if err := fs.moveSelfEntry(ctx, stream, oldParent, entry, newParent, newName, func() error {
if entry.IsDirectory() {
- if err := fs.moveFolderSubEntries(ctx, oldParent, entry, newParent, newName, signatures); err != nil {
+ if err := fs.moveFolderSubEntries(ctx, stream, oldParent, entry, newParent, newName, signatures); err != nil {
return err
}
}
@@ -63,7 +117,7 @@ func (fs *FilerServer) moveEntry(ctx context.Context, oldParent util.FullPath, e
return nil
}
-func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
+func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, signatures []int32) error {
currentDirPath := oldParent.Child(entry.Name())
newDirPath := newParent.Child(newName)
@@ -84,7 +138,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
for _, item := range entries {
lastFileName = item.Name()
// println("processing", lastFileName)
- err := fs.moveEntry(ctx, currentDirPath, item, newDirPath, item.Name(), signatures)
+ err := fs.moveEntry(ctx, stream, currentDirPath, item, newDirPath, item.Name(), signatures)
if err != nil {
return err
}
@@ -96,7 +150,7 @@ func (fs *FilerServer) moveFolderSubEntries(ctx context.Context, oldParent util.
return nil
}
-func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error {
+func (fs *FilerServer) moveSelfEntry(ctx context.Context, stream filer_pb.SeaweedFiler_StreamRenameEntryServer, oldParent util.FullPath, entry *filer.Entry, newParent util.FullPath, newName string, moveFolderSubEntries func() error, signatures []int32) error {
oldPath, newPath := oldParent.Child(entry.Name()), newParent.Child(newName)
@@ -109,15 +163,37 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
// add to new directory
newEntry := &filer.Entry{
- FullPath: newPath,
- Attr: entry.Attr,
- Chunks: entry.Chunks,
- Extended: entry.Extended,
- Content: entry.Content,
+ FullPath: newPath,
+ Attr: entry.Attr,
+ Chunks: entry.Chunks,
+ Extended: entry.Extended,
+ Content: entry.Content,
+ HardLinkCounter: entry.HardLinkCounter,
+ HardLinkId: entry.HardLinkId,
+ Remote: entry.Remote,
+ Quota: entry.Quota,
}
- if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures); createErr != nil {
+ if createErr := fs.filer.CreateEntry(ctx, newEntry, false, false, signatures, false); createErr != nil {
return createErr
}
+ if stream != nil {
+ if err := stream.Send(&filer_pb.StreamRenameEntryResponse{
+ Directory: string(oldParent),
+ EventNotification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{
+ Name: entry.Name(),
+ },
+ NewEntry: newEntry.ToProtoEntry(),
+ DeleteChunks: false,
+ NewParentPath: string(newParent),
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ },
+ TsNs: time.Now().UnixNano(),
+ }); err != nil {
+ return err
+ }
+ }
if moveFolderSubEntries != nil {
if moveChildrenErr := moveFolderSubEntries(); moveChildrenErr != nil {
@@ -130,6 +206,24 @@ func (fs *FilerServer) moveSelfEntry(ctx context.Context, oldParent util.FullPat
if deleteErr != nil {
return deleteErr
}
+ if stream != nil {
+ if err := stream.Send(&filer_pb.StreamRenameEntryResponse{
+ Directory: string(oldParent),
+ EventNotification: &filer_pb.EventNotification{
+ OldEntry: &filer_pb.Entry{
+ Name: entry.Name(),
+ },
+ NewEntry: nil,
+ DeleteChunks: false,
+ NewParentPath: "",
+ IsFromOtherCluster: false,
+ Signatures: nil,
+ },
+ TsNs: time.Now().UnixNano(),
+ }); err != nil {
+ return err
+ }
+ }
return nil
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 3fdac1b26..da710234b 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -2,6 +2,7 @@ package weed_server
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"strings"
"time"
@@ -23,9 +24,11 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
peerAddress := findClientAddress(stream.Context(), 0)
- clientName := fs.addClient(req.ClientName, peerAddress)
-
- defer fs.deleteClient(clientName)
+ alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId)
+ if alreadyKnown {
+ return fmt.Errorf("duplicated subscription detected for client %s id %d", clientName, req.ClientId)
+ }
+ defer fs.deleteClient(clientName, req.ClientId)
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v starts to subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
@@ -37,28 +40,27 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
+ var isDone bool
for {
glog.V(4).Infof("read on disk %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
+ if isDone {
+ return nil
+ }
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
- } else {
- if readInMemoryLogErr == log_buffer.ResumeFromDiskError {
- time.Sleep(1127 * time.Millisecond)
- continue
- }
}
glog.V(4).Infof("read in memory %v aggregated subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, isDone, readInMemoryLogErr = fs.filer.MetaAggregator.MetaLogBuffer.LoopProcessLogData("aggMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.filer.MetaAggregator.ListenersLock.Lock()
fs.filer.MetaAggregator.ListenersCond.Wait()
fs.filer.MetaAggregator.ListenersLock.Unlock()
@@ -73,6 +75,9 @@ func (fs *FilerServer) SubscribeMetadata(req *filer_pb.SubscribeMetadataRequest,
break
}
}
+ if isDone {
+ return nil
+ }
time.Sleep(1127 * time.Millisecond)
}
@@ -85,9 +90,11 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
peerAddress := findClientAddress(stream.Context(), 0)
- clientName := fs.addClient(req.ClientName, peerAddress)
-
- defer fs.deleteClient(clientName)
+ alreadyKnown, clientName := fs.addClient(req.ClientName, peerAddress, req.ClientId)
+ if alreadyKnown {
+ return fmt.Errorf("duplicated local subscription detected for client %s id %d", clientName, req.ClientId)
+ }
+ defer fs.deleteClient(clientName, req.ClientId)
lastReadTime := time.Unix(0, req.SinceNs)
glog.V(0).Infof(" %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
@@ -99,14 +106,19 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
var processedTsNs int64
var readPersistedLogErr error
var readInMemoryLogErr error
+ var isDone bool
for {
// println("reading from persisted logs ...")
glog.V(0).Infof("read on disk %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- processedTsNs, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, eachLogEntryFn)
+ processedTsNs, isDone, readPersistedLogErr = fs.filer.ReadPersistedLogBuffer(lastReadTime, req.UntilNs, eachLogEntryFn)
if readPersistedLogErr != nil {
+ glog.V(0).Infof("read on disk %v local subscribe %s from %+v: %v", clientName, req.PathPrefix, lastReadTime, readPersistedLogErr)
return fmt.Errorf("reading from persisted logs: %v", readPersistedLogErr)
}
+ if isDone {
+ return nil
+ }
if processedTsNs != 0 {
lastReadTime = time.Unix(0, processedTsNs)
@@ -119,7 +131,7 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
glog.V(0).Infof("read in memory %v local subscribe %s from %+v", clientName, req.PathPrefix, lastReadTime)
- lastReadTime, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, func() bool {
+ lastReadTime, isDone, readInMemoryLogErr = fs.filer.LocalMetaLogBuffer.LoopProcessLogData("localMeta:"+clientName, lastReadTime, req.UntilNs, func() bool {
fs.listenersLock.Lock()
fs.listenersCond.Wait()
fs.listenersLock.Unlock()
@@ -130,11 +142,13 @@ func (fs *FilerServer) SubscribeLocalMetadata(req *filer_pb.SubscribeMetadataReq
continue
}
glog.Errorf("processed to %v: %v", lastReadTime, readInMemoryLogErr)
- time.Sleep(1127 * time.Millisecond)
if readInMemoryLogErr != log_buffer.ResumeError {
break
}
}
+ if isDone {
+ return nil
+ }
}
return readInMemoryLogErr
@@ -201,17 +215,24 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
return nil
}
- if !strings.HasPrefix(fullpath, req.PathPrefix) {
- if eventNotification.NewParentPath != "" {
- newFullPath := util.Join(eventNotification.NewParentPath, entryName)
- if !strings.HasPrefix(newFullPath, req.PathPrefix) {
+ if hasPrefixIn(fullpath, req.PathPrefixes) {
+ // good
+ } else {
+ if !strings.HasPrefix(fullpath, req.PathPrefix) {
+ if eventNotification.NewParentPath != "" {
+ newFullPath := util.Join(eventNotification.NewParentPath, entryName)
+ if !strings.HasPrefix(newFullPath, req.PathPrefix) {
+ return nil
+ }
+ } else {
return nil
}
- } else {
- return nil
}
}
+ // collect timestamps for path
+ stats.FilerServerLastSendTsOfSubscribeGauge.WithLabelValues(fs.option.Host.String(), req.ClientName, req.PathPrefix).Set(float64(tsNs))
+
message := &filer_pb.SubscribeMetadataResponse{
Directory: dirPath,
EventNotification: eventNotification,
@@ -227,12 +248,31 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
}
}
-func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) {
+func hasPrefixIn(text string, prefixes []string) bool {
+ for _, p := range prefixes {
+ if strings.HasPrefix(text, p) {
+ return true
+ }
+ }
+ return false
+}
+
+func (fs *FilerServer) addClient(clientType string, clientAddress string, clientId int32) (alreadyKnown bool, clientName string) {
clientName = clientType + "@" + clientAddress
glog.V(0).Infof("+ listener %v", clientName)
+ if clientId != 0 {
+ fs.knownListenersLock.Lock()
+ _, alreadyKnown = fs.knownListeners[clientId]
+ fs.knownListenersLock.Unlock()
+ }
return
}
-func (fs *FilerServer) deleteClient(clientName string) {
+func (fs *FilerServer) deleteClient(clientName string, clientId int32) {
glog.V(0).Infof("- listener %v", clientName)
+ if clientId != 0 {
+ fs.knownListenersLock.Lock()
+ delete(fs.knownListeners, clientId)
+ fs.knownListenersLock.Unlock()
+ }
}
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 534bc4840..6bf0261ee 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -16,10 +16,12 @@ import (
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/chrislusf/seaweedfs/weed/filer"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/arangodb"
_ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
_ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
_ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
@@ -34,7 +36,9 @@ import (
_ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis"
_ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis3"
_ "github.com/chrislusf/seaweedfs/weed/filer/sqlite"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/ydb"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/notification"
_ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
@@ -46,7 +50,8 @@ import (
)
type FilerOption struct {
- Masters []string
+ Masters map[string]pb.ServerAddress
+ FilerGroup string
Collection string
DefaultReplication string
DisableDirListing bool
@@ -54,21 +59,23 @@ type FilerOption struct {
DirListingLimit int
DataCenter string
Rack string
+ DataNode string
DefaultLevelDbDir string
DisableHttp bool
- Host string
- Port uint32
+ Host pb.ServerAddress
recursiveDelete bool
Cipher bool
SaveToFilerLimit int64
- Filers []string
ConcurrentUploadLimit int64
+ ShowUIDirectoryDelete bool
}
type FilerServer struct {
+ filer_pb.UnimplementedSeaweedFilerServer
option *FilerOption
secret security.SigningKey
filer *filer.Filer
+ filerGuard *security.Guard
grpcDialOption grpc.DialOption
// metrics read from the master
@@ -79,6 +86,10 @@ type FilerServer struct {
listenersLock sync.Mutex
listenersCond *sync.Cond
+ // track known metadata listeners
+ knownListenersLock sync.Mutex
+ knownListeners map[int32]struct{}
+
brokers map[string]map[string]bool
brokersLock sync.Mutex
@@ -88,9 +99,19 @@ type FilerServer struct {
func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) (fs *FilerServer, err error) {
+ v := util.GetViper()
+ signingKey := v.GetString("jwt.filer_signing.key")
+ v.SetDefault("jwt.filer_signing.expires_after_seconds", 10)
+ expiresAfterSec := v.GetInt("jwt.filer_signing.expires_after_seconds")
+
+ readSigningKey := v.GetString("jwt.filer_signing.read.key")
+ v.SetDefault("jwt.filer_signing.read.expires_after_seconds", 60)
+ readExpiresAfterSec := v.GetInt("jwt.filer_signing.read.expires_after_seconds")
+
fs = &FilerServer{
option: option,
grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.filer"),
+ knownListeners: make(map[int32]struct{}),
brokers: make(map[string]map[string]bool),
inFlightDataLimitCond: sync.NewCond(new(sync.Mutex)),
}
@@ -100,20 +121,21 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.FilerGroup, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
+ // we do not support IP whitelist right now
+ fs.filerGuard = security.NewGuard([]string{}, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
fs.checkWithMaster()
- go stats.LoopPushingMetric("filer", stats.SourceName(fs.option.Port), fs.metricsAddress, fs.metricsIntervalSec)
- go fs.filer.KeepConnectedToMaster()
+ go stats.LoopPushingMetric("filer", string(fs.option.Host), fs.metricsAddress, fs.metricsIntervalSec)
+ go fs.filer.KeepMasterClientConnected()
- v := util.GetViper()
if !util.LoadConfiguration("filer", false) {
- v.Set("leveldb2.enabled", true)
- v.Set("leveldb2.dir", option.DefaultLevelDbDir)
+ v.SetDefault("leveldb2.enabled", true)
+ v.SetDefault("leveldb2.dir", option.DefaultLevelDbDir)
_, err := os.Stat(option.DefaultLevelDbDir)
if os.IsNotExist(err) {
os.MkdirAll(option.DefaultLevelDbDir, 0755)
@@ -130,7 +152,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
// TODO deprecated, will be be removed after 2020-12-31
// replaced by https://github.com/chrislusf/seaweedfs/wiki/Path-Specific-Configuration
// fs.filer.FsyncBuckets = v.GetStringSlice("filer.options.buckets_fsync")
- fs.filer.LoadConfiguration(v)
+ isFresh := fs.filer.LoadConfiguration(v)
notification.LoadConfiguration(v, "notification.")
@@ -143,9 +165,15 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
-
- fs.filer.LoadBuckets()
+ existingNodes := fs.filer.ListExistingPeerUpdates()
+ startFromTime := time.Now().Add(-filer.LogFlushInterval)
+ if isFresh {
+ glog.V(0).Infof("%s bootstrap from peers %+v", option.Host, existingNodes)
+ if err := fs.filer.MaybeBootstrapFromPeers(option.Host, existingNodes, startFromTime); err != nil {
+ glog.Fatalf("%s bootstrap from %+v", option.Host, existingNodes)
+ }
+ }
+ fs.filer.AggregateFromPeers(option.Host, existingNodes, startFromTime)
fs.filer.LoadFilerConf()
@@ -160,17 +188,10 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
func (fs *FilerServer) checkWithMaster() {
- for _, master := range fs.option.Masters {
- _, err := pb.ParseServerToGrpcAddress(master)
- if err != nil {
- glog.Fatalf("invalid master address %s: %v", master, err)
- }
- }
-
isConnected := false
for !isConnected {
for _, master := range fs.option.Masters {
- readErr := operation.WithMasterServerClient(master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ readErr := operation.WithMasterServerClient(false, master, fs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 118646a04..6f0d0b7ca 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -1,7 +1,9 @@
package weed_server
import (
+ "errors"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/util"
"net/http"
"strings"
@@ -15,6 +17,19 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
start := time.Now()
+ if r.Method == "OPTIONS" {
+ stats.FilerRequestCounter.WithLabelValues("options").Inc()
+ OptionsHandler(w, r, false)
+ stats.FilerRequestHistogram.WithLabelValues("options").Observe(time.Since(start).Seconds())
+ return
+ }
+
+ isReadHttpCall := r.Method == "GET" || r.Method == "HEAD"
+ if !fs.maybeCheckJwtAuthorization(r, !isReadHttpCall) {
+ writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
+ return
+ }
+
// proxy to volume servers
var fileId string
if strings.HasPrefix(r.RequestURI, "/?proxyChunkId=") {
@@ -78,20 +93,31 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
fs.PostHandler(w, r, contentLength)
stats.FilerRequestHistogram.WithLabelValues("post").Observe(time.Since(start).Seconds())
}
- case "OPTIONS":
- stats.FilerRequestCounter.WithLabelValues("options").Inc()
- OptionsHandler(w, r, false)
- stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
}
}
func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Request) {
+
+ start := time.Now()
+
+ // We handle OPTIONS first because it never should be authenticated
+ if r.Method == "OPTIONS" {
+ stats.FilerRequestCounter.WithLabelValues("options").Inc()
+ OptionsHandler(w, r, true)
+ stats.FilerRequestHistogram.WithLabelValues("options").Observe(time.Since(start).Seconds())
+ return
+ }
+
+ if !fs.maybeCheckJwtAuthorization(r, false) {
+ writeJsonError(w, r, http.StatusUnauthorized, errors.New("wrong jwt"))
+ return
+ }
+
w.Header().Set("Server", "SeaweedFS Filer "+util.VERSION)
if r.Header.Get("Origin") != "" {
w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Credentials", "true")
}
- start := time.Now()
switch r.Method {
case "GET":
stats.FilerRequestCounter.WithLabelValues("get").Inc()
@@ -101,10 +127,6 @@ func (fs *FilerServer) readonlyFilerHandler(w http.ResponseWriter, r *http.Reque
stats.FilerRequestCounter.WithLabelValues("head").Inc()
fs.GetOrHeadHandler(w, r)
stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
- case "OPTIONS":
- stats.FilerRequestCounter.WithLabelValues("options").Inc()
- OptionsHandler(w, r, true)
- stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
}
}
@@ -116,3 +138,41 @@ func OptionsHandler(w http.ResponseWriter, r *http.Request, isReadOnly bool) {
}
w.Header().Add("Access-Control-Allow-Headers", "*")
}
+
+// maybeCheckJwtAuthorization returns true if access should be granted, false if it should be denied
+func (fs *FilerServer) maybeCheckJwtAuthorization(r *http.Request, isWrite bool) bool {
+
+ var signingKey security.SigningKey
+
+ if isWrite {
+ if len(fs.filerGuard.SigningKey) == 0 {
+ return true
+ } else {
+ signingKey = fs.filerGuard.SigningKey
+ }
+ } else {
+ if len(fs.filerGuard.ReadSigningKey) == 0 {
+ return true
+ } else {
+ signingKey = fs.filerGuard.ReadSigningKey
+ }
+ }
+
+ tokenStr := security.GetJwt(r)
+ if tokenStr == "" {
+ glog.V(1).Infof("missing jwt from %s", r.RemoteAddr)
+ return false
+ }
+
+ token, err := security.DecodeJwt(signingKey, tokenStr, &security.SeaweedFilerClaims{})
+ if err != nil {
+ glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err)
+ return false
+ }
+ if !token.Valid {
+ glog.V(1).Infof("jwt invalid from %s: %v", r.RemoteAddr, tokenStr)
+ return false
+ } else {
+ return true
+ }
+}
diff --git a/weed/server/filer_server_handlers_proxy.go b/weed/server/filer_server_handlers_proxy.go
index b8b28790b..301d609ec 100644
--- a/weed/server/filer_server_handlers_proxy.go
+++ b/weed/server/filer_server_handlers_proxy.go
@@ -3,6 +3,7 @@ package weed_server
import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/util"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
"math/rand"
"net/http"
@@ -62,6 +63,9 @@ func (fs *FilerServer) proxyToVolumeServer(w http.ResponseWriter, r *http.Reques
w.Header()[k] = v
}
w.WriteHeader(proxyResponse.StatusCode)
- io.Copy(w, proxyResponse.Body)
+
+ buf := mem.Allocate(128 * 1024)
+ defer mem.Free(buf)
+ io.CopyBuffer(w, proxyResponse.Body, buf)
}
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 054a1bd00..28573f7b3 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -4,10 +4,12 @@ import (
"bytes"
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
+ "math"
"mime"
"net/http"
- "net/url"
"path/filepath"
"strconv"
"strings"
@@ -17,11 +19,67 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/images"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
)
+// Validates the preconditions. Returns true if GET/HEAD operation should not proceed.
+// Preconditions supported are:
+// If-Modified-Since
+// If-Unmodified-Since
+// If-Match
+// If-None-Match
+func checkPreconditions(w http.ResponseWriter, r *http.Request, entry *filer.Entry) bool {
+
+ etag := filer.ETagEntry(entry)
+ /// When more than one conditional request header field is present in a
+ /// request, the order in which the fields are evaluated becomes
+ /// important. In practice, the fields defined in this document are
+ /// consistently implemented in a single, logical order, since "lost
+ /// update" preconditions have more strict requirements than cache
+ /// validation, a validated cache is more efficient than a partial
+ /// response, and entity tags are presumed to be more accurate than date
+ /// validators. https://tools.ietf.org/html/rfc7232#section-5
+ if entry.Attr.Mtime.IsZero() {
+ return false
+ }
+ w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat))
+
+ ifMatchETagHeader := r.Header.Get("If-Match")
+ ifUnmodifiedSinceHeader := r.Header.Get("If-Unmodified-Since")
+ if ifMatchETagHeader != "" {
+ if util.CanonicalizeETag(etag) != util.CanonicalizeETag(ifMatchETagHeader) {
+ w.WriteHeader(http.StatusPreconditionFailed)
+ return true
+ }
+ } else if ifUnmodifiedSinceHeader != "" {
+ if t, parseError := time.Parse(http.TimeFormat, ifUnmodifiedSinceHeader); parseError == nil {
+ if t.Before(entry.Attr.Mtime) {
+ w.WriteHeader(http.StatusPreconditionFailed)
+ return true
+ }
+ }
+ }
+
+ ifNoneMatchETagHeader := r.Header.Get("If-None-Match")
+ ifModifiedSinceHeader := r.Header.Get("If-Modified-Since")
+ if ifNoneMatchETagHeader != "" {
+ if util.CanonicalizeETag(etag) == util.CanonicalizeETag(ifNoneMatchETagHeader) {
+ w.WriteHeader(http.StatusNotModified)
+ return true
+ }
+ } else if ifModifiedSinceHeader != "" {
+ if t, parseError := time.Parse(http.TimeFormat, ifModifiedSinceHeader); parseError == nil {
+ if t.After(entry.Attr.Mtime) {
+ w.WriteHeader(http.StatusNotModified)
+ return true
+ }
+ }
+ }
+
+ return false
+}
+
func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
path := r.URL.Path
@@ -38,11 +96,11 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
if err == filer_pb.ErrNotFound {
glog.V(1).Infof("Not found %s: %v", path, err)
- stats.FilerRequestCounter.WithLabelValues("read.notfound").Inc()
+ stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadNotFound).Inc()
w.WriteHeader(http.StatusNotFound)
} else {
glog.Errorf("Internal %s: %v", path, err)
- stats.FilerRequestCounter.WithLabelValues("read.internalerror").Inc()
+ stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadInternal).Inc()
w.WriteHeader(http.StatusInternalServerError)
}
return
@@ -62,10 +120,22 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
return
}
- // set etag
+ query := r.URL.Query()
+ if query.Get("metadata") == "true" {
+ if query.Get("resolveManifest") == "true" {
+ if entry.Chunks, _, err = filer.ResolveChunkManifest(
+ fs.filer.MasterClient.GetLookupFileIdFunction(),
+ entry.Chunks, 0, math.MaxInt64); err != nil {
+ err = fmt.Errorf("failed to resolve chunk manifest, err: %s", err.Error())
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ }
+ }
+ writeJsonQuiet(w, r, http.StatusOK, entry)
+ return
+ }
+
etag := filer.ETagEntry(entry)
- if ifm := r.Header.Get("If-Match"); ifm != "" && ifm != "\""+etag+"\"" {
- w.WriteHeader(http.StatusPreconditionFailed)
+ if checkPreconditions(w, r, entry) {
return
}
@@ -82,22 +152,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.Header().Set("Content-Type", mimeType)
}
- // if modified since
- if !entry.Attr.Mtime.IsZero() {
- w.Header().Set("Last-Modified", entry.Attr.Mtime.UTC().Format(http.TimeFormat))
- if r.Header.Get("If-Modified-Since") != "" {
- if t, parseError := time.Parse(http.TimeFormat, r.Header.Get("If-Modified-Since")); parseError == nil {
- if !t.Before(entry.Attr.Mtime) {
- w.WriteHeader(http.StatusNotModified)
- return
- }
- }
- }
- }
-
// print out the header from extended properties
for k, v := range entry.Extended {
- w.Header().Set(k, string(v))
+ if !strings.HasPrefix(k, "xattr-") {
+ // "xattr-" prefix is set in filesys.XATTR_PREFIX
+ w.Header().Set(k, string(v))
+ }
}
//Seaweed custom header are not visible to Vue or javascript
@@ -111,27 +171,20 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
w.Header().Set("Access-Control-Expose-Headers", strings.Join(seaweedHeaders, ","))
//set tag count
- if r.Method == "GET" {
- tagCount := 0
- for k := range entry.Extended {
- if strings.HasPrefix(k, xhttp.AmzObjectTagging+"-") {
- tagCount++
- }
- }
- if tagCount > 0 {
- w.Header().Set(xhttp.AmzTagCount, strconv.Itoa(tagCount))
+ tagCount := 0
+ for k := range entry.Extended {
+ if strings.HasPrefix(k, s3_constants.AmzObjectTagging+"-") {
+ tagCount++
}
}
-
- if inm := r.Header.Get("If-None-Match"); inm == "\""+etag+"\"" {
- w.WriteHeader(http.StatusNotModified)
- return
+ if tagCount > 0 {
+ w.Header().Set(s3_constants.AmzTagCount, strconv.Itoa(tagCount))
}
+
setEtag(w, etag)
filename := entry.Name()
- filename = url.QueryEscape(filename)
- adjustHeaderContentDisposition(w, r, filename)
+ adjustPassthroughHeaders(w, r, filename)
totalSize := int64(entry.Size())
@@ -147,10 +200,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
width, height, mode, shouldResize := shouldResizeImages(ext, r)
if shouldResize {
- data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks)
+ data := mem.Allocate(int(totalSize))
+ defer mem.Free(data)
+ err := filer.ReadAll(data, fs.filer.MasterClient, entry.Chunks)
if err != nil {
glog.Errorf("failed to read %s: %v", path, err)
- w.WriteHeader(http.StatusNotModified)
+ w.WriteHeader(http.StatusInternalServerError)
return
}
rs, _, _ := images.Resized(ext, bytes.NewReader(data), width, height, mode)
@@ -163,6 +218,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if offset+size <= int64(len(entry.Content)) {
_, err := writer.Write(entry.Content[offset : offset+size])
if err != nil {
+ stats.FilerRequestCounter.WithLabelValues(stats.ErrorWriteEntry).Inc()
glog.Errorf("failed to write entry content: %v", err)
}
return err
@@ -170,10 +226,12 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
chunks := entry.Chunks
if entry.IsInRemoteOnly() {
dir, name := entry.FullPath.DirAndName()
- if resp, err := fs.DownloadToLocal(context.Background(), &filer_pb.DownloadToLocalRequest{
+ if resp, err := fs.CacheRemoteObjectToLocalCluster(context.Background(), &filer_pb.CacheRemoteObjectToLocalClusterRequest{
Directory: dir,
Name: name,
}); err != nil {
+ stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadCache).Inc()
+ glog.Errorf("CacheRemoteObjectToLocalCluster %s: %v", entry.FullPath, err)
return fmt.Errorf("cache %s: %v", entry.FullPath, err)
} else {
chunks = resp.Entry.Chunks
@@ -182,6 +240,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
err = filer.StreamContent(fs.filer.MasterClient, writer, chunks, offset, size)
if err != nil {
+ stats.FilerRequestCounter.WithLabelValues(stats.ErrorReadStream).Inc()
glog.Errorf("failed to stream content %s: %v", r.URL, err)
}
return err
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index 307c411b6..eaf17fa18 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -2,9 +2,6 @@ package weed_server
import (
"context"
- "encoding/base64"
- "fmt"
- "github.com/skip2/go-qrcode"
"net/http"
"strconv"
"strings"
@@ -49,8 +46,10 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
path = ""
}
+ emptyFolder := true
if len(entries) > 0 {
lastFileName = entries[len(entries)-1].Name()
+ emptyFolder = false
}
glog.V(4).Infof("listDirectory %s, last file %s, limit %d: %d items", path, lastFileName, limit, len(entries))
@@ -62,30 +61,27 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
Limit int
LastFileName string
ShouldDisplayLoadMore bool
+ EmptyFolder bool
}{
path,
entries,
limit,
lastFileName,
shouldDisplayLoadMore,
+ emptyFolder,
})
return
}
- var qrImageString string
- img, err := qrcode.Encode(fmt.Sprintf("http://%s:%d%s", fs.option.Host, fs.option.Port, r.URL.Path), qrcode.Medium, 128)
- if err == nil {
- qrImageString = base64.StdEncoding.EncodeToString(img)
- }
-
- ui.StatusTpl.Execute(w, struct {
+ err = ui.StatusTpl.Execute(w, struct {
Path string
Breadcrumbs []ui.Breadcrumb
Entries interface{}
Limit int
LastFileName string
ShouldDisplayLoadMore bool
- QrImage string
+ EmptyFolder bool
+ ShowDirectoryDelete bool
}{
path,
ui.ToBreadcrumb(path),
@@ -93,6 +89,10 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
limit,
lastFileName,
shouldDisplayLoadMore,
- qrImageString,
+ emptyFolder,
+ fs.option.ShowUIDirectoryDelete,
})
+ if err != nil {
+ glog.V(0).Infof("Template Execute Error: %v", err)
+ }
}
diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go
index 70b5327d6..ae2093947 100644
--- a/weed/server/filer_server_handlers_tagging.go
+++ b/weed/server/filer_server_handlers_tagging.go
@@ -43,7 +43,7 @@ func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request)
}
}
- if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false); dbErr != nil {
glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
@@ -82,7 +82,9 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque
toDelete := strings.Split(r.URL.Query().Get("tagging"), ",")
deletions := make(map[string]struct{})
for _, deletion := range toDelete {
- deletions[deletion] = struct{}{}
+ if deletion != "" {
+ deletions[deletion] = struct{}{}
+ }
}
// delete all tags or specific tags
@@ -107,7 +109,7 @@ func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Reque
return
}
- if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil, false); dbErr != nil {
glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr)
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 39d983ab7..bbaf28aa8 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -3,6 +3,8 @@ package weed_server
import (
"context"
"errors"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"net/http"
"os"
"strings"
@@ -57,14 +59,21 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
ctx := context.Background()
+ destination := r.RequestURI
+ if finalDestination := r.Header.Get(s3_constants.SeaweedStorageDestinationHeader); finalDestination != "" {
+ destination = finalDestination
+ }
+
query := r.URL.Query()
- so, err := fs.detectStorageOption0(r.RequestURI,
+ so, err := fs.detectStorageOption0(destination,
query.Get("collection"),
query.Get("replication"),
query.Get("ttl"),
query.Get("disk"),
+ query.Get("fsync"),
query.Get("dataCenter"),
query.Get("rack"),
+ query.Get("dataNode"),
)
if err != nil {
if err == ErrReadOnly {
@@ -76,11 +85,78 @@ func (fs *FilerServer) PostHandler(w http.ResponseWriter, r *http.Request, conte
return
}
- fs.autoChunk(ctx, w, r, contentLength, so)
+ if query.Has("mv.from") {
+ fs.move(ctx, w, r, so)
+ } else {
+ fs.autoChunk(ctx, w, r, contentLength, so)
+ }
+
util.CloseRequest(r)
}
+func (fs *FilerServer) move(ctx context.Context, w http.ResponseWriter, r *http.Request, so *operation.StorageOption) {
+ src := r.URL.Query().Get("mv.from")
+ dst := r.URL.Path
+
+ glog.V(2).Infof("FilerServer.move %v to %v", src, dst)
+
+ var err error
+ if src, err = clearName(src); err != nil {
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+ if dst, err = clearName(dst); err != nil {
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+ src = strings.TrimRight(src, "/")
+ if src == "" {
+ err = fmt.Errorf("invalid source '/'")
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ srcPath := util.FullPath(src)
+ dstPath := util.FullPath(dst)
+ srcEntry, err := fs.filer.FindEntry(ctx, srcPath)
+ if err != nil {
+ err = fmt.Errorf("failed to get src entry '%s', err: %s", src, err)
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ oldDir, oldName := srcPath.DirAndName()
+ newDir, newName := dstPath.DirAndName()
+ newName = util.Nvl(newName, oldName)
+
+ dstEntry, err := fs.filer.FindEntry(ctx, util.FullPath(strings.TrimRight(dst, "/")))
+ if err != nil && err != filer_pb.ErrNotFound {
+ err = fmt.Errorf("failed to get dst entry '%s', err: %s", dst, err)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+ if err == nil && !dstEntry.IsDirectory() && srcEntry.IsDirectory() {
+ err = fmt.Errorf("move: cannot overwrite non-directory '%s' with directory '%s'", dst, src)
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ _, err = fs.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{
+ OldDirectory: oldDir,
+ OldName: oldName,
+ NewDirectory: newDir,
+ NewName: newName,
+ })
+ if err != nil {
+ err = fmt.Errorf("failed to move entry from '%s' to '%s', err: %s", src, dst, err)
+ writeJsonError(w, r, http.StatusBadRequest, err)
+ return
+ }
+
+ w.WriteHeader(http.StatusNoContent)
+}
+
// curl -X DELETE http://localhost:8888/path/to
// curl -X DELETE http://localhost:8888/path/to?recursive=true
// curl -X DELETE http://localhost:8888/path/to?recursive=true&ignoreRecursiveError=true
@@ -115,7 +191,7 @@ func (fs *FilerServer) DeleteHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
-func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack string) (*operation.StorageOption, error) {
+func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication string, ttlSeconds int32, diskType, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
rule := fs.filer.FilerConf.MatchStorageRule(requestURI)
@@ -124,10 +200,9 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
}
// required by buckets folder
- bucketDefaultCollection, bucketDefaultReplication, fsync := "", "", false
+ bucketDefaultCollection := ""
if strings.HasPrefix(requestURI, fs.filer.DirBucketsPath+"/") {
bucketDefaultCollection = fs.filer.DetectBucket(util.FullPath(requestURI))
- bucketDefaultReplication, fsync = fs.filer.ReadBucketOption(bucketDefaultCollection)
}
if ttlSeconds == 0 {
@@ -139,23 +214,33 @@ func (fs *FilerServer) detectStorageOption(requestURI, qCollection, qReplication
}
return &operation.StorageOption{
- Replication: util.Nvl(qReplication, rule.Replication, bucketDefaultReplication, fs.option.DefaultReplication),
+ Replication: util.Nvl(qReplication, rule.Replication, fs.option.DefaultReplication),
Collection: util.Nvl(qCollection, rule.Collection, bucketDefaultCollection, fs.option.Collection),
- DataCenter: util.Nvl(dataCenter, fs.option.DataCenter),
- Rack: util.Nvl(rack, fs.option.Rack),
+ DataCenter: util.Nvl(dataCenter, rule.DataCenter, fs.option.DataCenter),
+ Rack: util.Nvl(rack, rule.Rack, fs.option.Rack),
+ DataNode: util.Nvl(dataNode, rule.DataNode, fs.option.DataNode),
TtlSeconds: ttlSeconds,
DiskType: util.Nvl(diskType, rule.DiskType),
- Fsync: fsync || rule.Fsync,
+ Fsync: rule.Fsync,
VolumeGrowthCount: rule.VolumeGrowthCount,
}, nil
}
-func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, dataCenter, rack string) (*operation.StorageOption, error) {
+func (fs *FilerServer) detectStorageOption0(requestURI, qCollection, qReplication string, qTtl string, diskType string, fsync string, dataCenter, rack, dataNode string) (*operation.StorageOption, error) {
ttl, err := needle.ReadTTL(qTtl)
if err != nil {
glog.Errorf("fail to parse ttl %s: %v", qTtl, err)
}
- return fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack)
+ so, err := fs.detectStorageOption(requestURI, qCollection, qReplication, int32(ttl.Minutes())*60, diskType, dataCenter, rack, dataNode)
+ if so != nil {
+ if fsync == "false" {
+ so.Fsync = false
+ } else if fsync == "true" {
+ so.Fsync = true
+ }
+ }
+
+ return so, err
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index a42e0fc97..9c2b9959f 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants"
"io"
"net/http"
"os"
@@ -15,7 +16,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -62,7 +62,8 @@ func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *
}
} else if reply != nil {
if len(md5bytes) > 0 {
- w.Header().Set("Content-MD5", util.Base64Encode(md5bytes))
+ md5InBase64 := util.Base64Encode(md5bytes)
+ w.Header().Set("Content-MD5", md5InBase64)
}
writeJsonQuiet(w, r, http.StatusCreated, reply)
}
@@ -96,6 +97,9 @@ func (fs *FilerServer) doPostAutoChunk(ctx context.Context, w http.ResponseWrite
md5bytes = md5Hash.Sum(nil)
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
+ if replyerr != nil {
+ fs.filer.DeleteChunks(fileChunks)
+ }
return
}
@@ -115,6 +119,9 @@ func (fs *FilerServer) doPutAutoChunk(ctx context.Context, w http.ResponseWriter
md5bytes = md5Hash.Sum(nil)
filerResult, replyerr = fs.saveMetaData(ctx, r, fileName, contentType, so, md5bytes, fileChunks, chunkOffset, smallContent)
+ if replyerr != nil {
+ fs.filer.DeleteChunks(fileChunks)
+ }
return
}
@@ -123,6 +130,10 @@ func isAppend(r *http.Request) bool {
return r.URL.Query().Get("op") == "append"
}
+func skipCheckParentDirEntry(r *http.Request) bool {
+ return r.URL.Query().Get("skipCheckParentDir") == "true"
+}
+
func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileName string, contentType string, so *operation.StorageOption, md5bytes []byte, fileChunks []*filer_pb.FileChunk, chunkOffset int64, content []byte) (filerResult *FilerPostResult, replyerr error) {
// detect file mode
@@ -153,9 +164,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
}
var entry *filer.Entry
+ var newChunks []*filer_pb.FileChunk
var mergedChunks []*filer_pb.FileChunk
+
+ isAppend := isAppend(r)
+ isOffsetWrite := len(fileChunks) > 0 && fileChunks[0].Offset > 0
// when it is an append
- if isAppend(r) {
+ if isAppend || isOffsetWrite {
existingEntry, findErr := fs.filer.FindEntry(ctx, util.FullPath(path))
if findErr != nil && findErr != filer_pb.ErrNotFound {
glog.V(0).Infof("failing to find %s: %v", path, findErr)
@@ -166,11 +181,13 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
entry.Mtime = time.Now()
entry.Md5 = nil
// adjust chunk offsets
- for _, chunk := range fileChunks {
- chunk.Offset += int64(entry.FileSize)
+ if isAppend {
+ for _, chunk := range fileChunks {
+ chunk.Offset += int64(entry.FileSize)
+ }
+ entry.FileSize += uint64(chunkOffset)
}
- mergedChunks = append(entry.Chunks, fileChunks...)
- entry.FileSize += uint64(chunkOffset)
+ newChunks = append(entry.Chunks, fileChunks...)
// TODO
if len(entry.Content) > 0 {
@@ -180,27 +197,31 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
} else {
glog.V(4).Infoln("saving", path)
- mergedChunks = fileChunks
+ newChunks = fileChunks
entry = &filer.Entry{
FullPath: util.FullPath(path),
Attr: filer.Attr{
- Mtime: time.Now(),
- Crtime: time.Now(),
- Mode: os.FileMode(mode),
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: so.Replication,
- Collection: so.Collection,
- TtlSec: so.TtlSeconds,
- DiskType: so.DiskType,
- Mime: contentType,
- Md5: md5bytes,
- FileSize: uint64(chunkOffset),
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: os.FileMode(mode),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ TtlSec: so.TtlSeconds,
+ Mime: contentType,
+ Md5: md5bytes,
+ FileSize: uint64(chunkOffset),
},
Content: content,
}
}
+ // maybe concatenate small chunks into one whole chunk
+ mergedChunks, replyerr = fs.maybeMergeChunks(so, newChunks)
+ if replyerr != nil {
+ glog.V(0).Infof("merge chunks %s: %v", r.RequestURI, replyerr)
+ mergedChunks = newChunks
+ }
+
// maybe compact entry chunks
mergedChunks, replyerr = filer.MaybeManifestize(fs.saveAsChunk(so), mergedChunks)
if replyerr != nil {
@@ -208,6 +229,10 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
return
}
entry.Chunks = mergedChunks
+ if isOffsetWrite {
+ entry.Md5 = nil
+ entry.FileSize = entry.Size()
+ }
filerResult = &FilerPostResult{
Name: fileName,
@@ -217,13 +242,17 @@ func (fs *FilerServer) saveMetaData(ctx context.Context, r *http.Request, fileNa
entry.Extended = SaveAmzMetaData(r, entry.Extended, false)
for k, v := range r.Header {
- if len(v) > 0 && (strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires") {
- entry.Extended[k] = []byte(v[0])
+ if len(v) > 0 && len(v[0]) > 0 {
+ if strings.HasPrefix(k, needle.PairNamePrefix) || k == "Cache-Control" || k == "Expires" || k == "Content-Disposition" {
+ entry.Extended[k] = []byte(v[0])
+ }
+ if k == "Response-Content-Disposition" {
+ entry.Extended["Content-Disposition"] = []byte(v[0])
+ }
}
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
- fs.filer.DeleteChunks(fileChunks)
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, skipCheckParentDirEntry(r)); dbErr != nil {
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to write %s to filer server : %v", path, dbErr)
@@ -241,7 +270,16 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
}
// upload the chunk to the volume server
- uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: urlLocation,
+ Filename: name,
+ Cipher: fs.option.Cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: auth,
+ }
+ uploadResult, uploadErr, _ := operation.Upload(reader, uploadOption)
if uploadErr != nil {
return nil, "", "", uploadErr
}
@@ -291,7 +329,7 @@ func (fs *FilerServer) mkdir(ctx context.Context, w http.ResponseWriter, r *http
Name: util.FullPath(path).Name(),
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false); dbErr != nil {
replyerr = dbErr
filerResult.Error = dbErr.Error()
glog.V(0).Infof("failing to create dir %s on filer server : %v", path, dbErr)
@@ -308,21 +346,23 @@ func SaveAmzMetaData(r *http.Request, existing map[string][]byte, isReplace bool
}
}
- if sc := r.Header.Get(xhttp.AmzStorageClass); sc != "" {
- metadata[xhttp.AmzStorageClass] = []byte(sc)
+ if sc := r.Header.Get(s3_constants.AmzStorageClass); sc != "" {
+ metadata[s3_constants.AmzStorageClass] = []byte(sc)
}
- if tags := r.Header.Get(xhttp.AmzObjectTagging); tags != "" {
+ if tags := r.Header.Get(s3_constants.AmzObjectTagging); tags != "" {
for _, v := range strings.Split(tags, "&") {
tag := strings.Split(v, "=")
if len(tag) == 2 {
- metadata[xhttp.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
+ metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = []byte(tag[1])
+ } else if len(tag) == 1 {
+ metadata[s3_constants.AmzObjectTagging+"-"+tag[0]] = nil
}
}
}
for header, values := range r.Header {
- if strings.HasPrefix(header, xhttp.AmzUserMetaPrefix) {
+ if strings.HasPrefix(header, s3_constants.AmzUserMetaPrefix) {
for _, value := range values {
metadata[header] = []byte(value)
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index acaa8f5ab..1f10d044e 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -44,7 +44,16 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
// println("detect2 mimetype to", pu.MimeType)
}
- uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: urlLocation,
+ Filename: pu.FileName,
+ Cipher: true,
+ IsInputCompressed: false,
+ MimeType: pu.MimeType,
+ PairMap: pu.PairMap,
+ Jwt: auth,
+ }
+ uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption)
if uploadError != nil {
return nil, fmt.Errorf("upload to volume server: %v", uploadError)
}
@@ -64,17 +73,14 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
entry := &filer.Entry{
FullPath: util.FullPath(path),
Attr: filer.Attr{
- Mtime: time.Now(),
- Crtime: time.Now(),
- Mode: 0660,
- Uid: OS_UID,
- Gid: OS_GID,
- Replication: so.Replication,
- Collection: so.Collection,
- TtlSec: so.TtlSeconds,
- DiskType: so.DiskType,
- Mime: pu.MimeType,
- Md5: util.Base64Md5ToBytes(pu.ContentMd5),
+ Mtime: time.Now(),
+ Crtime: time.Now(),
+ Mode: 0660,
+ Uid: OS_UID,
+ Gid: OS_GID,
+ TtlSec: so.TtlSeconds,
+ Mime: pu.MimeType,
+ Md5: util.Base64Md5ToBytes(pu.ContentMd5),
},
Chunks: fileChunks,
}
@@ -84,7 +90,7 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
Size: int64(pu.OriginalDataSize),
}
- if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil); dbErr != nil {
+ if dbErr := fs.filer.CreateEntry(ctx, entry, false, false, nil, false); dbErr != nil {
fs.filer.DeleteChunks(entry.Chunks)
err = dbErr
filerResult.Error = dbErr.Error()
diff --git a/weed/server/filer_server_handlers_write_merge.go b/weed/server/filer_server_handlers_write_merge.go
new file mode 100644
index 000000000..dadc6f726
--- /dev/null
+++ b/weed/server/filer_server_handlers_write_merge.go
@@ -0,0 +1,11 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (fs *FilerServer) maybeMergeChunks(so *operation.StorageOption, inputChunks []*filer_pb.FileChunk) (mergedChunks []*filer_pb.FileChunk, err error) {
+ //TODO merge consecutive smaller chunks into a large chunk to reduce number of chunks
+ return inputChunks, nil
+}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 2275ff1bc..fe3346402 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -3,11 +3,12 @@ package weed_server
import (
"bytes"
"crypto/md5"
+ "fmt"
+ "golang.org/x/exp/slices"
"hash"
"io"
- "io/ioutil"
"net/http"
- "sort"
+ "strconv"
"strings"
"sync"
"sync/atomic"
@@ -29,9 +30,25 @@ var bufPool = sync.Pool{
}
func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, chunkOffset int64, uploadErr error, smallContent []byte) {
+ query := r.URL.Query()
+
+ isAppend := isAppend(r)
+ if query.Has("offset") {
+ offset := query.Get("offset")
+ offsetInt, err := strconv.ParseInt(offset, 10, 64)
+ if err != nil || offsetInt < 0 {
+ err = fmt.Errorf("invalid 'offset': '%s'", offset)
+ return nil, nil, 0, err, nil
+ }
+ if isAppend && offsetInt > 0 {
+ err = fmt.Errorf("cannot set offset when op=append")
+ return nil, nil, 0, err, nil
+ }
+ chunkOffset = offsetInt
+ }
md5Hash = md5.New()
- var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
+ var partReader = io.NopCloser(io.TeeReader(reader, md5Hash))
var wg sync.WaitGroup
var bytesBufferCounter int64
@@ -57,14 +74,15 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
dataSize, err := bytesBuffer.ReadFrom(limitedReader)
- // data, err := ioutil.ReadAll(limitedReader)
+ // data, err := io.ReadAll(limitedReader)
if err != nil || dataSize == 0 {
bufPool.Put(bytesBuffer)
atomic.AddInt64(&bytesBufferCounter, -1)
bytesBufferLimitCond.Signal()
+ uploadErr = err
break
}
- if chunkOffset == 0 && !isAppend(r) {
+ if chunkOffset == 0 && !isAppend {
if dataSize < fs.option.SaveToFilerLimit || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) {
chunkOffset += dataSize
smallContent = make([]byte, dataSize)
@@ -109,13 +127,12 @@ func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Reque
wg.Wait()
if uploadErr != nil {
+ fs.filer.DeleteChunks(fileChunks)
return nil, md5Hash, 0, uploadErr, nil
}
-
- sort.Slice(fileChunks, func(i, j int) bool {
- return fileChunks[i].Offset < fileChunks[j].Offset
+ slices.SortFunc(fileChunks, func(a, b *filer_pb.FileChunk) bool {
+ return a.Offset < b.Offset
})
-
return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
@@ -127,7 +144,16 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
}()
- uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: urlLocation,
+ Filename: fileName,
+ Cipher: fs.option.Cipher,
+ IsInputCompressed: false,
+ MimeType: contentType,
+ PairMap: pairMap,
+ Jwt: auth,
+ }
+ uploadResult, err, data := operation.Upload(limitedReader, uploadOption)
if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
}
diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go
index 5fcc7e88f..75965e761 100644
--- a/weed/server/filer_server_rocksdb.go
+++ b/weed/server/filer_server_rocksdb.go
@@ -1,3 +1,4 @@
+//go:build rocksdb
// +build rocksdb
package weed_server
diff --git a/weed/server/filer_ui/breadcrumb.go b/weed/server/filer_ui/breadcrumb.go
index 5016117a8..3201ff76c 100644
--- a/weed/server/filer_ui/breadcrumb.go
+++ b/weed/server/filer_ui/breadcrumb.go
@@ -15,8 +15,12 @@ func ToBreadcrumb(fullpath string) (crumbs []Breadcrumb) {
parts := strings.Split(fullpath, "/")
for i := 0; i < len(parts); i++ {
+ name := parts[i]
+ if name == "" {
+ name = "/"
+ }
crumb := Breadcrumb{
- Name: parts[i] + " /",
+ Name: name,
Link: "/" + util.Join(parts[0:i+1]...),
}
if !strings.HasSuffix(crumb.Link, "/") {
diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html
index 84dc4d4d6..c9d832e8f 100644
--- a/weed/server/filer_ui/filer.html
+++ b/weed/server/filer_ui/filer.html
@@ -11,6 +11,7 @@
#drop-area {
border: 1px transparent;
+ margin-top: 5px;
}
#drop-area.highlight {
@@ -26,6 +27,12 @@
border-radius: 2px;
border: 1px solid #ccc;
float: right;
+ margin-left: 2px;
+ margin-bottom: 0;
+ }
+
+ label {
+ font-weight: normal;
}
.button:hover {
@@ -36,10 +43,37 @@
display: none;
}
- .qrImage {
+ td, th {
+ vertical-align: bottom;
+ }
+
+ .table-hover > tbody > tr:hover > * > div.operations {
display: block;
- margin-left: auto;
- margin-right: auto;
+ }
+
+ .table > tbody > tr {
+ height: 39px;
+ }
+
+ div.operations {
+ display: none;
+ }
+
+ .footer {
+ position: absolute;
+ bottom: 0px;
+ right: 5%;
+ min-width: 25%;
+ border-left: 1px solid #ccc;
+ border-right: 1px solid #ccc;
+ }
+
+ .add-files {
+ font-size: 46px;
+ text-align: center;
+ border: 1px dashed #999;
+ padding-bottom: 9px;
+ margin: 0 2px;
}
</style>
</head>
@@ -53,12 +87,21 @@
</div>
<div class="row">
<div>
+ <div class="btn-group btn-group-sm pull-right" role="group" style="margin-top:3px;">
+ <label class="btn btn-default" onclick="handleCreateDir()">
+ <span class="glyphicon glyphicon-plus" aria-hidden="true"></span> New Folder
+ </label>
+ <label class="btn btn-default" for="fileElem">
+ <span class="glyphicon glyphicon-cloud-upload" aria-hidden="true"></span> Upload
+ </label>
+ </div>
+ <ol class="breadcrumb">
{{ range $entry := .Breadcrumbs }}
- <a href="{{ printpath $entry.Link }}">
+ <li><a href="{{ printpath $entry.Link }}">
{{ $entry.Name }}
- </a>
+ </li></a>
{{ end }}
- <label class="button" for="fileElem">Upload</label>
+ </ol>
</div>
</div>
@@ -66,117 +109,250 @@
<form class="upload-form">
<input type="file" id="fileElem" multiple onchange="handleFiles(this.files)">
- <table width="90%">
- {{$path := .Path }}
+ {{ if .EmptyFolder }}
+ <div class="row add-files">
+ +
+ </div>
+ {{ else }}
+ <table width="100%" class="table table-hover">
+ {{ $path := .Path }}
+ {{ $showDirDel := .ShowDirectoryDelete }}
{{ range $entry_index, $entry := .Entries }}
<tr>
<td>
- {{if $entry.IsDirectory}}
- <img src="/seaweedfsstatic/images/folder.gif" width="20" height="23">
+ {{ if $entry.IsDirectory }}
+ <span class="glyphicon glyphicon-folder-open" aria-hidden="true"></span>&nbsp;
<a href="{{ printpath $path "/" $entry.Name "/"}}" >
{{ $entry.Name }}
</a>
- {{else}}
+ {{ else }}
<a href="{{ printpath $path "/" $entry.Name }}" >
{{ $entry.Name }}
</a>
- {{end}}
+ {{ end }}
</td>
<td align="right" nowrap>
- {{if $entry.IsDirectory}}
- {{else}}
+ {{ if not $entry.IsDirectory }}
{{ $entry.Mime }}&nbsp;
- {{end}}
+ {{ end }}
</td>
<td align="right" nowrap>
- {{if $entry.IsDirectory}}
- {{else}}
+ {{ if not $entry.IsDirectory }}
{{ $entry.Size | humanizeBytes }}&nbsp;
- {{end}}
+ {{ end }}
</td>
- <td nowrap>
+ <td align="right" nowrap>
{{ $entry.Timestamp.Format "2006-01-02 15:04" }}
</td>
+ <td style="width:75px">
+ <div class="btn-group btn-group-xs pull-right operations" role="group">
+ <label class="btn" onclick="handleRename('{{ $entry.Name }}', '{{ printpath $path "/" }}')">
+ <span class="glyphicon glyphicon-edit" aria-hidden="true"></span>
+ </label>
+ {{ if and $entry.IsDirectory $showDirDel }}
+ <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name "/" }}')">
+ <span class="glyphicon glyphicon-trash" aria-hidden="true"></span>
+ </label>
+ {{ end }}
+ {{ if not $entry.IsDirectory }}
+ <label class="btn" onclick="handleDelete('{{ printpath $path "/" $entry.Name }}')">
+ <span class="glyphicon glyphicon-trash" aria-hidden="true"></span>
+ </label>
+ {{ end }}
+ </div>
+ </td>
</tr>
{{ end }}
-
</table>
+ {{ end }}
</form>
</div>
- {{if .ShouldDisplayLoadMore}}
+ {{ if .ShouldDisplayLoadMore }}
<div class="row">
- <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName}} >
+ <a href={{ print .Path "?limit=" .Limit "&lastFileName=" .LastFileName }} >
Load more
</a>
</div>
- {{end}}
+ {{ end }}
<br/>
<br/>
-
- <div class="navbar navbar-fixed-bottom">
- <img src="data:image/png;base64,{{.QrImage}}" class="qrImage"/>
+ <div id="progress-area" class="footer" style="display: none;">
</div>
-
</div>
</body>
<script type="text/javascript">
// ************************ Drag and drop ***************** //
- let dropArea = document.getElementById("drop-area")
+ let dropArea = document.getElementById("drop-area");
+ let progressArea = document.getElementById("progress-area");
// Prevent default drag behaviors
;['dragenter', 'dragover', 'dragleave', 'drop'].forEach(eventName => {
- dropArea.addEventListener(eventName, preventDefaults, false)
- document.body.addEventListener(eventName, preventDefaults, false)
- })
+ dropArea.addEventListener(eventName, preventDefaults, false);
+ document.body.addEventListener(eventName, preventDefaults, false);
+ });
// Highlight drop area when item is dragged over it
;['dragenter', 'dragover'].forEach(eventName => {
- dropArea.addEventListener(eventName, highlight, false)
- })
+ dropArea.addEventListener(eventName, highlight, false);
+ });
;['dragleave', 'drop'].forEach(eventName => {
- dropArea.addEventListener(eventName, unhighlight, false)
- })
+ dropArea.addEventListener(eventName, unhighlight, false);
+ });
// Handle dropped files
- dropArea.addEventListener('drop', handleDrop, false)
+ dropArea.addEventListener('drop', handleDrop, false);
function preventDefaults(e) {
- e.preventDefault()
- e.stopPropagation()
+ e.preventDefault();
+ e.stopPropagation();
}
function highlight(e) {
- dropArea.classList.add('highlight')
+ dropArea.classList.add('highlight');
}
function unhighlight(e) {
- dropArea.classList.remove('highlight')
+ dropArea.classList.remove('highlight');
}
function handleDrop(e) {
- var dt = e.dataTransfer
- var files = dt.files
+ var dt = e.dataTransfer;
+ var files = dt.files;
+
+ handleFiles(files);
+ }
- handleFiles(files)
+ function reloadPage() {
+ window.location.reload(true);
}
+ var uploadList = {};
+
function handleFiles(files) {
- files = [...files]
- files.forEach(uploadFile)
- window.location.reload()
+ files = [...files];
+ files.forEach(startUpload);
+ renderProgress();
+ files.forEach(uploadFile);
+ }
+
+ function startUpload(file, i) {
+ uploadList[file.name] = {'name': file.name, 'percent': 0, 'finish': false};
+ }
+
+ function renderProgress() {
+ var values = Object.values(uploadList);
+ var html = '<table class="table">\n<tr><th>Uploading</th><\/tr>\n';
+ for (let i of values) {
+ var progressBarClass = 'progress-bar-striped active';
+ if (i.percent >= 100) {
+ progressBarClass = 'progress-bar-success';
+ }
+ html += '<tr>\n<td>\n';
+ html += '<div class="progress" style="margin-bottom: 2px;">\n';
+ html += '<div class="progress-bar ' + progressBarClass + '" role="progressbar" aria-valuenow="' + '100" aria-valuemin="0" aria-valuemax="100" style="width:' + i.percent + '%;">';
+ html += '<span style="margin-right: 10px;">' + i.name + '</span>' + i.percent + '%<\/div>';
+ html += '<\/div>\n<\/td>\n<\/tr>\n';
+ }
+ html += '<\/table>\n';
+ progressArea.innerHTML = html;
+ if (values.length > 0) {
+ progressArea.attributes.style.value = '';
+ }
+ }
+
+ function reportProgress(file, percent) {
+ var item = uploadList[file]
+ item.percent = percent;
+ renderProgress();
+ }
+
+ function finishUpload(file) {
+ uploadList[file]['finish'] = true;
+ renderProgress();
+ var allFinish = true;
+ for (let i of Object.values(uploadList)) {
+ if (!i.finish) {
+ allFinish = false;
+ break;
+ }
+ }
+ if (allFinish) {
+ console.log('All Finish');
+ reloadPage();
+ }
}
function uploadFile(file, i) {
- var url = window.location.href
- var xhr = new XMLHttpRequest()
- var formData = new FormData()
- xhr.open('POST', url, false)
+ var url = window.location.href;
+ var xhr = new XMLHttpRequest();
+ var fileName = file.name;
+ xhr.upload.addEventListener('progress', function(e) {
+ if (e.lengthComputable) {
+ var percent = Math.ceil((e.loaded / e.total) * 100);
+ reportProgress(fileName, percent)
+ }
+ });
+ xhr.upload.addEventListener('loadend', function(e) {
+ finishUpload(fileName);
+ });
+ var formData = new FormData();
+ xhr.open('POST', url, true);
+ formData.append('file', file);
+ xhr.send(formData);
+ }
+
+ function handleCreateDir() {
+ var dirName = prompt('Folder Name:', '');
+ dirName = dirName.trim();
+ if (dirName == null || dirName == '') {
+ return;
+ }
+ var baseUrl = window.location.href;
+ if (!baseUrl.endsWith('/')) {
+ baseUrl += '/';
+ }
+ var url = baseUrl + dirName;
+ if (!url.endsWith('/')) {
+ url += '/';
+ }
+ var xhr = new XMLHttpRequest();
+ xhr.open('POST', url, false);
+ xhr.setRequestHeader('Content-Type', '');
+ xhr.send();
+ reloadPage();
+ }
+
+ function handleRename(originName, basePath) {
+ var newName = prompt('New Name:', originName);
+ if (newName == null || newName == '') {
+ return;
+ }
+ var url = basePath + newName;
+ var originPath = basePath + originName;
+ url += '?mv.from=' + originPath;
+ var xhr = new XMLHttpRequest();
+ xhr.open('POST', url, false);
+ xhr.setRequestHeader('Content-Type', '');
+ xhr.send();
+ reloadPage();
+ }
+
+ function handleDelete(path) {
+ if (!confirm('Are you sure to delete ' + path + '?')) {
+ return;
+ }
+ var url = path;
+ if (url.endsWith('/')) {
+ url += '?recursive=true';
+ }
- formData.append('file', file)
- xhr.send(formData)
+ var xhr = new XMLHttpRequest();
+ xhr.open('DELETE', url, false);
+ xhr.send();
+ reloadPage();
}
</script>
</html>
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index afd479b21..4d0fbbc41 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -2,12 +2,17 @@ package weed_server
import (
"context"
+ "errors"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/backend"
"net"
- "strings"
+ "sort"
"time"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/stats"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/util"
+
"github.com/chrislusf/raft"
"google.golang.org/grpc/peer"
@@ -17,16 +22,56 @@ import (
"github.com/chrislusf/seaweedfs/weed/topology"
)
+func (ms *MasterServer) RegisterUuids(heartbeat *master_pb.Heartbeat) (duplicated_uuids []string, err error) {
+ ms.Topo.UuidAccessLock.Lock()
+ defer ms.Topo.UuidAccessLock.Unlock()
+ key := fmt.Sprintf("%s:%d", heartbeat.Ip, heartbeat.Port)
+ if ms.Topo.UuidMap == nil {
+ ms.Topo.UuidMap = make(map[string][]string)
+ }
+ // find whether new uuid exists
+ for k, v := range ms.Topo.UuidMap {
+ sort.Strings(v)
+ for _, id := range heartbeat.LocationUuids {
+ index := sort.SearchStrings(v, id)
+ if index < len(v) && v[index] == id {
+ duplicated_uuids = append(duplicated_uuids, id)
+ glog.Errorf("directory of %s on %s has been loaded", id, k)
+ }
+ }
+ }
+ if len(duplicated_uuids) > 0 {
+ return duplicated_uuids, errors.New("volume: Duplicated volume directories were loaded")
+ }
+
+ ms.Topo.UuidMap[key] = heartbeat.LocationUuids
+ glog.V(0).Infof("found new uuid:%v %v , %v", key, heartbeat.LocationUuids, ms.Topo.UuidMap)
+ return nil, nil
+}
+
+func (ms *MasterServer) UnRegisterUuids(ip string, port int) {
+ ms.Topo.UuidAccessLock.Lock()
+ defer ms.Topo.UuidAccessLock.Unlock()
+ key := fmt.Sprintf("%s:%d", ip, port)
+ delete(ms.Topo.UuidMap, key)
+ glog.V(0).Infof("remove volume server %v, online volume server: %v", key, ms.Topo.UuidMap)
+}
+
func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServer) error {
var dn *topology.DataNode
defer func() {
if dn != nil {
-
+ dn.Counter--
+ if dn.Counter > 0 {
+ glog.V(0).Infof("disconnect phantom volume server %s:%d remaining %d", dn.Ip, dn.Port, dn.Counter)
+ return
+ }
// if the volume server disconnects and reconnects quickly
// the unregister and register can race with each other
ms.Topo.UnRegisterDataNode(dn)
glog.V(0).Infof("unregister disconnected volume server %s:%d", dn.Ip, dn.Port)
+ ms.UnRegisterUuids(dn.Ip, dn.Port)
message := &master_pb.VolumeLocation{
Url: dn.Url(),
@@ -40,13 +85,8 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(message.DeletedVids) > 0 {
- ms.clientChansLock.RLock()
- for _, ch := range ms.clientChans {
- ch <- message
- }
- ms.clientChansLock.RUnlock()
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}
-
}
}()
@@ -58,6 +98,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
} else {
glog.Warningf("SendHeartbeat.Recv: %v", err)
}
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("error").Inc()
return err
}
@@ -67,19 +108,34 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
- dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
- glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
+ dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), int(heartbeat.GrpcPort), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
+ glog.V(0).Infof("added volume server %d: %v:%d %v", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort(), heartbeat.LocationUuids)
+ uuidlist, err := ms.RegisterUuids(heartbeat)
+ if err != nil {
+ if stream_err := stream.Send(&master_pb.HeartbeatResponse{
+ DuplicatedUuids: uuidlist,
+ }); stream_err != nil {
+ glog.Warningf("SendHeartbeat.Send DuplicatedDirectory response to %s:%d %v", dn.Ip, dn.Port, stream_err)
+ return stream_err
+ }
+ return err
+ }
+
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
}
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("dataNode").Inc()
+ dn.Counter++
}
dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc()
+
var dataCenter string
if dc := dn.GetDataCenter(); dc != nil {
dataCenter = string(dc.Id())
@@ -89,6 +145,12 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
PublicUrl: dn.PublicUrl,
DataCenter: dataCenter,
}
+ if len(heartbeat.NewVolumes) > 0 {
+ stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc()
+ }
+ if len(heartbeat.DeletedVolumes) > 0 {
+ stats.FilerRequestCounter.WithLabelValues("deletedVolumes").Inc()
+ }
if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
// process delta volume ids if exists for fast volume id updates
for _, volInfo := range heartbeat.NewVolumes {
@@ -102,7 +164,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(heartbeat.Volumes) > 0 || heartbeat.HasNoVolumes {
+ dcName, rackName := ms.Topo.Configuration.Locate(heartbeat.Ip, heartbeat.DataCenter, heartbeat.Rack)
+ ms.Topo.DataNodeRegistration(dcName, rackName, dn)
+
// process heartbeat.Volumes
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("Volumes").Inc()
newVolumes, deletedVolumes := ms.Topo.SyncDataNodeRegistration(heartbeat.Volumes, dn)
for _, v := range newVolumes {
@@ -116,45 +182,41 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
if len(heartbeat.NewEcShards) > 0 || len(heartbeat.DeletedEcShards) > 0 {
-
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("newEcShards").Inc()
// update master internal volume layouts
ms.Topo.IncrementalSyncDataNodeEcShards(heartbeat.NewEcShards, heartbeat.DeletedEcShards, dn)
for _, s := range heartbeat.NewEcShards {
- message.NewVids = append(message.NewVids, s.Id)
+ message.NewEcVids = append(message.NewEcVids, s.Id)
}
for _, s := range heartbeat.DeletedEcShards {
- if dn.HasVolumesById(needle.VolumeId(s.Id)) {
+ if dn.HasEcShards(needle.VolumeId(s.Id)) {
continue
}
- message.DeletedVids = append(message.DeletedVids, s.Id)
+ message.DeletedEcVids = append(message.DeletedEcVids, s.Id)
}
}
if len(heartbeat.EcShards) > 0 || heartbeat.HasNoEcShards {
- glog.V(1).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
+ stats.MasterReceivedHeartbeatCounter.WithLabelValues("ecShards").Inc()
+ glog.V(4).Infof("master received ec shards from %s: %+v", dn.Url(), heartbeat.EcShards)
newShards, deletedShards := ms.Topo.SyncDataNodeEcShards(heartbeat.EcShards, dn)
// broadcast the ec vid changes to master clients
for _, s := range newShards {
- message.NewVids = append(message.NewVids, uint32(s.VolumeId))
+ message.NewEcVids = append(message.NewEcVids, uint32(s.VolumeId))
}
for _, s := range deletedShards {
if dn.HasVolumesById(s.VolumeId) {
continue
}
- message.DeletedVids = append(message.DeletedVids, uint32(s.VolumeId))
+ message.DeletedEcVids = append(message.DeletedEcVids, uint32(s.VolumeId))
}
}
- if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
- ms.clientChansLock.RLock()
- for host, ch := range ms.clientChans {
- glog.V(0).Infof("master send to %s: %s", host, message.String())
- ch <- message
- }
- ms.clientChansLock.RUnlock()
+ if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 || len(message.NewEcVids) > 0 || len(message.DeletedEcVids) > 0 {
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: message})
}
// tell the volume servers about the leader
@@ -164,7 +226,7 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
if err := stream.Send(&master_pb.HeartbeatResponse{
- Leader: newLeader,
+ Leader: string(newLeader),
}); err != nil {
glog.Warningf("SendHeartbeat.Send response to to %s:%d %v", dn.Ip, dn.Port, err)
return err
@@ -185,17 +247,25 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
return ms.informNewLeader(stream)
}
- peerAddress := findClientAddress(stream.Context(), req.GrpcPort)
+ peerAddress := pb.ServerAddress(req.ClientAddress)
// buffer by 1 so we don't end up getting stuck writing to stopChan forever
stopChan := make(chan bool, 1)
- clientName, messageChan := ms.addClient(req.Name, peerAddress)
+ clientName, messageChan := ms.addClient(req.FilerGroup, req.ClientType, peerAddress)
+ for _, update := range ms.Cluster.AddClusterNode(req.FilerGroup, req.ClientType, peerAddress, req.Version) {
+ ms.broadcastToClients(update)
+ }
- defer ms.deleteClient(clientName)
+ defer func() {
+ for _, update := range ms.Cluster.RemoveClusterNode(req.FilerGroup, req.ClientType, peerAddress) {
+ ms.broadcastToClients(update)
+ }
+ ms.deleteClient(clientName)
+ }()
for _, message := range ms.Topo.ToVolumeLocations() {
- if sendErr := stream.Send(message); sendErr != nil {
+ if sendErr := stream.Send(&master_pb.KeepConnectedResponse{VolumeLocation: message}); sendErr != nil {
return sendErr
}
}
@@ -221,7 +291,10 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
case <-ticker.C:
if !ms.Topo.IsLeader() {
+ stats.MasterRaftIsleader.Set(0)
return ms.informNewLeader(stream)
+ } else {
+ stats.MasterRaftIsleader.Set(1)
}
case <-stopChan:
return nil
@@ -230,22 +303,32 @@ func (ms *MasterServer) KeepConnected(stream master_pb.Seaweed_KeepConnectedServ
}
+func (ms *MasterServer) broadcastToClients(message *master_pb.KeepConnectedResponse) {
+ ms.clientChansLock.RLock()
+ for _, ch := range ms.clientChans {
+ ch <- message
+ }
+ ms.clientChansLock.RUnlock()
+}
+
func (ms *MasterServer) informNewLeader(stream master_pb.Seaweed_KeepConnectedServer) error {
leader, err := ms.Topo.Leader()
if err != nil {
glog.Errorf("topo leader: %v", err)
return raft.NotLeaderError
}
- if err := stream.Send(&master_pb.VolumeLocation{
- Leader: leader,
+ if err := stream.Send(&master_pb.KeepConnectedResponse{
+ VolumeLocation: &master_pb.VolumeLocation{
+ Leader: string(leader),
+ },
}); err != nil {
return err
}
return nil
}
-func (ms *MasterServer) addClient(clientType string, clientAddress string) (clientName string, messageChan chan *master_pb.VolumeLocation) {
- clientName = clientType + "@" + clientAddress
+func (ms *MasterServer) addClient(filerGroup, clientType string, clientAddress pb.ServerAddress) (clientName string, messageChan chan *master_pb.KeepConnectedResponse) {
+ clientName = filerGroup + "." + clientType + "@" + string(clientAddress)
glog.V(0).Infof("+ client %v", clientName)
// we buffer this because otherwise we end up in a potential deadlock where
@@ -253,7 +336,7 @@ func (ms *MasterServer) addClient(clientType string, clientAddress string) (clie
// trying to send to it in SendHeartbeat and so we can't lock the
// clientChansLock to remove the channel and we're stuck writing to it
// 100 is probably overkill
- messageChan = make(chan *master_pb.VolumeLocation, 100)
+ messageChan = make(chan *master_pb.KeepConnectedResponse, 100)
ms.clientChansLock.Lock()
ms.clientChans[clientName] = messageChan
@@ -284,25 +367,12 @@ func findClientAddress(ctx context.Context, grpcPort uint32) string {
}
if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok {
externalIP := tcpAddr.IP
- return fmt.Sprintf("%s:%d", externalIP, grpcPort)
+ return util.JoinHostPort(externalIP.String(), int(grpcPort))
}
return pr.Addr.String()
}
-func (ms *MasterServer) ListMasterClients(ctx context.Context, req *master_pb.ListMasterClientsRequest) (*master_pb.ListMasterClientsResponse, error) {
- resp := &master_pb.ListMasterClientsResponse{}
- ms.clientChansLock.RLock()
- defer ms.clientChansLock.RUnlock()
-
- for k := range ms.clientChans {
- if strings.HasPrefix(k, req.ClientType+"@") {
- resp.GrpcAddresses = append(resp.GrpcAddresses, k[len(req.ClientType)+1:])
- }
- }
- return resp, nil
-}
-
func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_pb.GetMasterConfigurationRequest) (*master_pb.GetMasterConfigurationResponse, error) {
// tell the volume servers about the leader
@@ -315,7 +385,7 @@ func (ms *MasterServer) GetMasterConfiguration(ctx context.Context, req *master_
DefaultReplication: ms.option.DefaultReplicaPlacement,
VolumeSizeLimitMB: uint32(ms.option.VolumeSizeLimitMB),
VolumePreallocate: ms.option.VolumePreallocate,
- Leader: leader,
+ Leader: string(leader),
}
return resp, nil
diff --git a/weed/server/master_grpc_server_admin.go b/weed/server/master_grpc_server_admin.go
index 93c9e4e4e..1f37e979a 100644
--- a/weed/server/master_grpc_server_admin.go
+++ b/weed/server/master_grpc_server_admin.go
@@ -3,7 +3,11 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
"github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"math/rand"
"sync"
"time"
@@ -62,6 +66,7 @@ type AdminLock struct {
accessSecret int64
accessLockTime time.Time
lastClient string
+ lastMessage string
}
type AdminLocks struct {
@@ -75,15 +80,15 @@ func NewAdminLocks() *AdminLocks {
}
}
-func (locks *AdminLocks) isLocked(lockName string) (clientName string, isLocked bool) {
+func (locks *AdminLocks) isLocked(lockName string) (clientName string, message string, isLocked bool) {
locks.RLock()
defer locks.RUnlock()
adminLock, found := locks.locks[lockName]
if !found {
- return "", false
+ return "", "", false
}
- glog.V(4).Infof("isLocked %v", adminLock.lastClient)
- return adminLock.lastClient, adminLock.accessLockTime.Add(LockDuration).After(time.Now())
+ glog.V(4).Infof("isLocked %v: %v", adminLock.lastClient, adminLock.lastMessage)
+ return adminLock.lastClient, adminLock.lastMessage, adminLock.accessLockTime.Add(LockDuration).After(time.Now())
}
func (locks *AdminLocks) isValidToken(lockName string, ts time.Time, token int64) bool {
@@ -117,7 +122,7 @@ func (locks *AdminLocks) deleteLock(lockName string) {
func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.LeaseAdminTokenRequest) (*master_pb.LeaseAdminTokenResponse, error) {
resp := &master_pb.LeaseAdminTokenResponse{}
- if lastClient, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked {
+ if lastClient, lastMessage, isLocked := ms.adminLocks.isLocked(req.LockName); isLocked {
glog.V(4).Infof("LeaseAdminToken %v", lastClient)
if req.PreviousToken != 0 && ms.adminLocks.isValidToken(req.LockName, time.Unix(0, req.PreviousLockTime), req.PreviousToken) {
// for renew
@@ -126,7 +131,7 @@ func (ms *MasterServer) LeaseAdminToken(ctx context.Context, req *master_pb.Leas
return resp, nil
}
// refuse since still locked
- return resp, fmt.Errorf("already locked by " + lastClient)
+ return resp, fmt.Errorf("already locked by %v: %v", lastClient, lastMessage)
}
// for fresh lease request
ts, token := ms.adminLocks.generateToken(req.LockName, req.ClientName)
@@ -141,3 +146,41 @@ func (ms *MasterServer) ReleaseAdminToken(ctx context.Context, req *master_pb.Re
}
return resp, nil
}
+
+func (ms *MasterServer) Ping(ctx context.Context, req *master_pb.PingRequest) (resp *master_pb.PingResponse, pingErr error) {
+ resp = &master_pb.PingResponse{
+ StartTimeNs: time.Now().UnixNano(),
+ }
+ if req.TargetType == cluster.FilerType {
+ pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.VolumeServerType {
+ pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.MasterType {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), ms.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if pingErr != nil {
+ pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
+ }
+ resp.StopTimeNs = time.Now().UnixNano()
+ return
+}
diff --git a/weed/server/master_grpc_server_cluster.go b/weed/server/master_grpc_server_cluster.go
new file mode 100644
index 000000000..fea4a66aa
--- /dev/null
+++ b/weed/server/master_grpc_server_cluster.go
@@ -0,0 +1,41 @@
+package weed_server
+
+import (
+ "context"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "math/rand"
+)
+
+func (ms *MasterServer) ListClusterNodes(ctx context.Context, req *master_pb.ListClusterNodesRequest) (*master_pb.ListClusterNodesResponse, error) {
+ resp := &master_pb.ListClusterNodesResponse{}
+ filerGroup := cluster.FilerGroup(req.FilerGroup)
+ clusterNodes := ms.Cluster.ListClusterNode(filerGroup, req.ClientType)
+
+ for _, node := range clusterNodes {
+ resp.ClusterNodes = append(resp.ClusterNodes, &master_pb.ListClusterNodesResponse_ClusterNode{
+ Address: string(node.Address),
+ Version: node.Version,
+ IsLeader: ms.Cluster.IsOneLeader(filerGroup, node.Address),
+ CreatedAtNs: node.CreatedTs.UnixNano(),
+ })
+ }
+ return resp, nil
+}
+
+func (ms *MasterServer) GetOneFiler(filerGroup cluster.FilerGroup) pb.ServerAddress {
+
+ clusterNodes := ms.Cluster.ListClusterNode(filerGroup, cluster.FilerType)
+
+ var filers []pb.ServerAddress
+ for _, node := range clusterNodes {
+ if ms.Cluster.IsOneLeader(filerGroup, node.Address) {
+ filers = append(filers, node.Address)
+ }
+ }
+ if len(filers) > 0 {
+ return filers[rand.Intn(len(filers))]
+ }
+ return "localhost:8888"
+}
diff --git a/weed/server/master_grpc_server_collection.go b/weed/server/master_grpc_server_collection.go
index b92d6bcbe..654da6b3c 100644
--- a/weed/server/master_grpc_server_collection.go
+++ b/weed/server/master_grpc_server_collection.go
@@ -58,7 +58,7 @@ func (ms *MasterServer) doDeleteNormalCollection(collectionName string) error {
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
@@ -78,7 +78,7 @@ func (ms *MasterServer) doDeleteEcCollection(collectionName string) error {
listOfEcServers := ms.Topo.ListEcServersByCollection(collectionName)
for _, server := range listOfEcServers {
- err := operation.WithVolumeServerClient(server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, server, ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collectionName,
})
diff --git a/weed/server/master_grpc_server_raft.go b/weed/server/master_grpc_server_raft.go
new file mode 100644
index 000000000..258b6beac
--- /dev/null
+++ b/weed/server/master_grpc_server_raft.go
@@ -0,0 +1,79 @@
+package weed_server
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/hashicorp/raft"
+)
+
+func (ms *MasterServer) RaftListClusterServers(ctx context.Context, req *master_pb.RaftListClusterServersRequest) (*master_pb.RaftListClusterServersResponse, error) {
+ resp := &master_pb.RaftListClusterServersResponse{}
+
+ if ms.Topo.HashicorpRaft == nil {
+ return resp, nil
+ }
+
+ servers := ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers
+
+ for _, server := range servers {
+ resp.ClusterServers = append(resp.ClusterServers, &master_pb.RaftListClusterServersResponse_ClusterServers{
+ Id: string(server.ID),
+ Address: string(server.Address),
+ Suffrage: server.Suffrage.String(),
+ })
+ }
+ return resp, nil
+}
+
+func (ms *MasterServer) RaftAddServer(ctx context.Context, req *master_pb.RaftAddServerRequest) (*master_pb.RaftAddServerResponse, error) {
+ resp := &master_pb.RaftAddServerResponse{}
+
+ if ms.Topo.HashicorpRaft == nil {
+ return resp, nil
+ }
+
+ if ms.Topo.HashicorpRaft.State() != raft.Leader {
+ return nil, fmt.Errorf("raft add server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
+ }
+
+ var idxFuture raft.IndexFuture
+ if req.Voter {
+ idxFuture = ms.Topo.HashicorpRaft.AddVoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
+ } else {
+ idxFuture = ms.Topo.HashicorpRaft.AddNonvoter(raft.ServerID(req.Id), raft.ServerAddress(req.Address), 0, 0)
+ }
+
+ if err := idxFuture.Error(); err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
+
+func (ms *MasterServer) RaftRemoveServer(ctx context.Context, req *master_pb.RaftRemoveServerRequest) (*master_pb.RaftRemoveServerResponse, error) {
+ resp := &master_pb.RaftRemoveServerResponse{}
+
+ if ms.Topo.HashicorpRaft == nil {
+ return resp, nil
+ }
+
+ if ms.Topo.HashicorpRaft.State() != raft.Leader {
+ return nil, fmt.Errorf("raft remove server %s failed: %s is no current leader", req.Id, ms.Topo.HashicorpRaft.String())
+ }
+
+ if !req.Force {
+ ms.clientChansLock.RLock()
+ _, ok := ms.clientChans[fmt.Sprintf("%s@%s", cluster.MasterType, req.Id)]
+ ms.clientChansLock.RUnlock()
+ if ok {
+ return resp, fmt.Errorf("raft remove server %s failed: client connection to master exists", req.Id)
+ }
+ }
+
+ idxFuture := ms.Topo.HashicorpRaft.RemoveServer(raft.ServerID(req.Id), 0, 0)
+ if err := idxFuture.Error(); err != nil {
+ return nil, err
+ }
+ return resp, nil
+}
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 4b975a0c4..0382c2dae 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -42,15 +42,24 @@ func (ms *MasterServer) ProcessGrowRequest() {
return !found
})
+ option := req.Option
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
+
// not atomic but it's okay
- if !found && ms.shouldVolumeGrow(req.Option) {
+ if !found && vl.ShouldGrowVolumes(option) {
filter.Store(req, nil)
// we have lock called inside vg
go func() {
glog.V(1).Infoln("starting automatic volume grow")
start := time.Now()
- _, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
+ newVidLocations, err := ms.vg.AutomaticGrowByType(req.Option, ms.grpcDialOption, ms.Topo, req.Count)
glog.V(1).Infoln("finished automatic volume grow, cost ", time.Now().Sub(start))
+ if err == nil {
+ for _, newVidLocation := range newVidLocations {
+ ms.broadcastToClients(&master_pb.KeepConnectedResponse{VolumeLocation: newVidLocation})
+ }
+ }
+ vl.DoneGrowRequest()
if req.ErrCh != nil {
req.ErrCh <- err
@@ -82,7 +91,7 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV
}
var auth string
if strings.Contains(result.VolumeOrFileId, ",") { // this is a file id
- auth = string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, result.VolumeOrFileId))
+ auth = string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, result.VolumeOrFileId))
}
resp.VolumeIdLocations = append(resp.VolumeIdLocations, &master_pb.LookupVolumeResponse_VolumeIdLocation{
VolumeOrFileId: result.VolumeOrFileId,
@@ -130,10 +139,13 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
MemoryMapMaxSizeMb: req.MemoryMapMaxSizeMb,
}
- if ms.shouldVolumeGrow(option) {
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
+
+ if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) {
if ms.Topo.AvailableSpaceFor(option) <= 0 {
return nil, fmt.Errorf("no free volumes left for " + option.String())
}
+ vl.AddGrowRequest()
ms.vgCh <- &topology.VolumeGrowRequest{
Option: option,
Count: int(req.WritableVolumeCount),
@@ -147,14 +159,27 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
)
for time.Now().Sub(startTime) < maxTimeout {
- fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option)
+ fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option)
if err == nil {
+ dn := dnList.Head()
+ var replicas []*master_pb.Location
+ for _, r := range dnList.Rest() {
+ replicas = append(replicas, &master_pb.Location{
+ Url: r.Url(),
+ PublicUrl: r.PublicUrl,
+ GrpcPort: uint32(r.GrpcPort),
+ })
+ }
return &master_pb.AssignResponse{
- Fid: fid,
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
- Count: count,
- Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
+ Fid: fid,
+ Location: &master_pb.Location{
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ GrpcPort: uint32(dn.GrpcPort),
+ },
+ Count: count,
+ Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
+ Replicas: replicas,
}, nil
}
//glog.V(4).Infoln("waiting for volume growing...")
@@ -248,7 +273,7 @@ func (ms *MasterServer) VacuumVolume(ctx context.Context, req *master_pb.VacuumV
resp := &master_pb.VacuumVolumeResponse{}
- ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), ms.preallocateSize)
+ ms.Topo.Vacuum(ms.grpcDialOption, float64(req.GarbageThreshold), req.VolumeId, req.Collection, ms.preallocateSize)
return resp, nil
}
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index d2edeb6cb..9bf840f08 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -1,7 +1,9 @@
package weed_server
import (
+ "context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/stats"
"net/http"
"net/http/httputil"
"net/url"
@@ -11,8 +13,12 @@ import (
"sync"
"time"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+
"github.com/chrislusf/raft"
"github.com/gorilla/mux"
+ hashicorpRaft "github.com/hashicorp/raft"
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -26,14 +32,13 @@ import (
)
const (
- SequencerType = "master.sequencer.type"
- SequencerEtcdUrls = "master.sequencer.sequencer_etcd_urls"
- SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ SequencerType = "master.sequencer.type"
+ SequencerSnowflakeId = "master.sequencer.sequencer_snowflake_id"
+ RaftServerRemovalTime = 72 * time.Minute
)
type MasterOption struct {
- Host string
- Port int
+ Master pb.ServerAddress
MetaFolder string
VolumeSizeLimitMB uint32
VolumePreallocate bool
@@ -48,6 +53,7 @@ type MasterOption struct {
}
type MasterServer struct {
+ master_pb.UnimplementedSeaweedServer
option *MasterOption
guard *security.Guard
@@ -59,18 +65,23 @@ type MasterServer struct {
boundedLeaderChan chan int
+ onPeerUpdatDoneCn chan string
+ onPeerUpdatDoneCnExist bool
+
// notifying clients
clientChansLock sync.RWMutex
- clientChans map[string]chan *master_pb.VolumeLocation
+ clientChans map[string]chan *master_pb.KeepConnectedResponse
grpcDialOption grpc.DialOption
MasterClient *wdclient.MasterClient
adminLocks *AdminLocks
+
+ Cluster *cluster.Cluster
}
-func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *MasterServer {
+func NewMasterServer(r *mux.Router, option *MasterOption, peers map[string]pb.ServerAddress) *MasterServer {
v := util.GetViper()
signingKey := v.GetString("jwt.signing.key")
@@ -100,12 +111,16 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
option: option,
preallocateSize: preallocateSize,
vgCh: make(chan *topology.VolumeGrowRequest, 1<<6),
- clientChans: make(map[string]chan *master_pb.VolumeLocation),
+ clientChans: make(map[string]chan *master_pb.KeepConnectedResponse),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "", cluster.MasterType, option.Master, "", peers),
adminLocks: NewAdminLocks(),
+ Cluster: cluster.NewCluster(),
}
ms.boundedLeaderChan = make(chan int, 16)
+ ms.onPeerUpdatDoneCn = make(chan string)
+
+ ms.MasterClient.OnPeerUpdate = ms.OnPeerUpdate
seq := ms.createSequencer(option)
if nil == seq {
@@ -154,18 +169,41 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
}
func (ms *MasterServer) SetRaftServer(raftServer *RaftServer) {
- ms.Topo.RaftServer = raftServer.raftServer
- ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
- glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
- if ms.Topo.RaftServer.Leader() != "" {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
- }
- })
+ var raftServerName string
+ if raftServer.raftServer != nil {
+ ms.Topo.RaftServer = raftServer.raftServer
+ ms.Topo.RaftServer.AddEventListener(raft.LeaderChangeEventType, func(e raft.Event) {
+ glog.V(0).Infof("leader change event: %+v => %+v", e.PrevValue(), e.Value())
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", e.Value())).Inc()
+ if ms.Topo.RaftServer.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "becomes leader.")
+ }
+ })
+ raftServerName = ms.Topo.RaftServer.Name()
+ } else if raftServer.RaftHashicorp != nil {
+ ms.Topo.HashicorpRaft = raftServer.RaftHashicorp
+ leaderCh := raftServer.RaftHashicorp.LeaderCh()
+ prevLeader := ms.Topo.HashicorpRaft.Leader()
+ go func() {
+ for {
+ select {
+ case isLeader := <-leaderCh:
+ leader := ms.Topo.HashicorpRaft.Leader()
+ glog.V(0).Infof("is leader %+v change event: %+v => %+v", isLeader, prevLeader, leader)
+ stats.MasterLeaderChangeCounter.WithLabelValues(fmt.Sprintf("%+v", leader)).Inc()
+ prevLeader = leader
+ }
+ }
+ }()
+ raftServerName = ms.Topo.HashicorpRaft.String()
+ }
if ms.Topo.IsLeader() {
- glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", "I am the leader!")
+ glog.V(0).Infoln("[", raftServerName, "]", "I am the leader!")
} else {
- if ms.Topo.RaftServer.Leader() != "" {
+ if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
glog.V(0).Infoln("[", ms.Topo.RaftServer.Name(), "]", ms.Topo.RaftServer.Leader(), "is the leader.")
+ } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
+ glog.V(0).Infoln("[", ms.Topo.HashicorpRaft.String(), "]", ms.Topo.HashicorpRaft.Leader(), "is the leader.")
}
}
}
@@ -174,71 +212,70 @@ func (ms *MasterServer) proxyToLeader(f http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
f(w, r)
- } else if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
- ms.boundedLeaderChan <- 1
- defer func() { <-ms.boundedLeaderChan }()
- targetUrl, err := url.Parse("http://" + ms.Topo.RaftServer.Leader())
- if err != nil {
- writeJsonError(w, r, http.StatusInternalServerError,
- fmt.Errorf("Leader URL http://%s Parse Error: %v", ms.Topo.RaftServer.Leader(), err))
- return
- }
- glog.V(4).Infoln("proxying to leader", ms.Topo.RaftServer.Leader())
- proxy := httputil.NewSingleHostReverseProxy(targetUrl)
- director := proxy.Director
- proxy.Director = func(req *http.Request) {
- actualHost, err := security.GetActualRemoteHost(req)
- if err == nil {
- req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost)
- }
- director(req)
- }
- proxy.Transport = util.Transport
- proxy.ServeHTTP(w, r)
- } else {
- // handle requests locally
+ return
+ }
+ var raftServerLeader string
+ if ms.Topo.RaftServer != nil && ms.Topo.RaftServer.Leader() != "" {
+ raftServerLeader = ms.Topo.RaftServer.Leader()
+ } else if ms.Topo.HashicorpRaft != nil && ms.Topo.HashicorpRaft.Leader() != "" {
+ raftServerLeader = string(ms.Topo.HashicorpRaft.Leader())
+ }
+ if raftServerLeader == "" {
f(w, r)
+ return
+ }
+ ms.boundedLeaderChan <- 1
+ defer func() { <-ms.boundedLeaderChan }()
+ targetUrl, err := url.Parse("http://" + raftServerLeader)
+ if err != nil {
+ writeJsonError(w, r, http.StatusInternalServerError,
+ fmt.Errorf("Leader URL http://%s Parse Error: %v", raftServerLeader, err))
+ return
}
+ glog.V(4).Infoln("proxying to leader", raftServerLeader)
+ proxy := httputil.NewSingleHostReverseProxy(targetUrl)
+ director := proxy.Director
+ proxy.Director = func(req *http.Request) {
+ actualHost, err := security.GetActualRemoteHost(req)
+ if err == nil {
+ req.Header.Set("HTTP_X_FORWARDED_FOR", actualHost)
+ }
+ director(req)
+ }
+ proxy.Transport = util.Transport
+ proxy.ServeHTTP(w, r)
}
}
func (ms *MasterServer) startAdminScripts() {
- var err error
v := util.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
- glog.V(0).Infof("adminScripts:\n%v", adminScripts)
if adminScripts == "" {
return
}
+ glog.V(0).Infof("adminScripts: %v", adminScripts)
v.SetDefault("master.maintenance.sleep_minutes", 17)
sleepMinutes := v.GetInt("master.maintenance.sleep_minutes")
- v.SetDefault("master.filer.default", "localhost:8888")
- filerHostPort := v.GetString("master.filer.default")
-
scriptLines := strings.Split(adminScripts, "\n")
if !strings.Contains(adminScripts, "lock") {
scriptLines = append(append([]string{}, "lock"), scriptLines...)
scriptLines = append(scriptLines, "unlock")
}
- masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port)
+ masterAddress := string(ms.option.Master)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
shellOptions.Masters = &masterAddress
- shellOptions.FilerHost, shellOptions.FilerPort, err = util.ParseHostPort(filerHostPort)
- shellOptions.FilerAddress = filerHostPort
shellOptions.Directory = "/"
- if err != nil {
- glog.V(0).Infof("failed to parse master.filer.default = %s : %v\n", filerHostPort, err)
- return
- }
+ emptyFilerGroup := ""
+ shellOptions.FilerGroup = &emptyFilerGroup
- commandEnv := shell.NewCommandEnv(shellOptions)
+ commandEnv := shell.NewCommandEnv(&shellOptions)
reg, _ := regexp.Compile(`'.*?'|".*?"|\S+`)
@@ -247,9 +284,13 @@ func (ms *MasterServer) startAdminScripts() {
go func() {
commandEnv.MasterClient.WaitUntilConnected()
- c := time.Tick(time.Duration(sleepMinutes) * time.Minute)
- for range c {
+ for {
+ time.Sleep(time.Duration(sleepMinutes) * time.Minute)
if ms.Topo.IsLeader() {
+ shellOptions.FilerAddress = ms.GetOneFiler(cluster.FilerGroup(*shellOptions.FilerGroup))
+ if shellOptions.FilerAddress == "" {
+ continue
+ }
for _, line := range scriptLines {
for _, c := range strings.Split(line, ";") {
processEachCmd(reg, c, commandEnv)
@@ -287,19 +328,10 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
seqType := strings.ToLower(v.GetString(SequencerType))
glog.V(1).Infof("[%s] : [%s]", SequencerType, seqType)
switch strings.ToLower(seqType) {
- case "etcd":
- var err error
- urls := v.GetString(SequencerEtcdUrls)
- glog.V(0).Infof("[%s] : [%s]", SequencerEtcdUrls, urls)
- seq, err = sequence.NewEtcdSequencer(urls, option.MetaFolder)
- if err != nil {
- glog.Error(err)
- seq = nil
- }
case "snowflake":
var err error
snowflakeId := v.GetInt(SequencerSnowflakeId)
- seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port), snowflakeId)
+ seq, err = sequence.NewSnowflakeSequencer(string(option.Master), snowflakeId)
if err != nil {
glog.Error(err)
seq = nil
@@ -309,3 +341,57 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
}
return seq
}
+
+func (ms *MasterServer) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, startFrom time.Time) {
+ if update.NodeType != cluster.MasterType || ms.Topo.HashicorpRaft == nil {
+ return
+ }
+ glog.V(4).Infof("OnPeerUpdate: %+v", update)
+
+ peerAddress := pb.ServerAddress(update.Address)
+ peerName := string(peerAddress)
+ isLeader := ms.Topo.HashicorpRaft.State() == hashicorpRaft.Leader
+ if update.IsAdd {
+ if isLeader {
+ raftServerFound := false
+ for _, server := range ms.Topo.HashicorpRaft.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerName {
+ raftServerFound = true
+ }
+ }
+ if !raftServerFound {
+ glog.V(0).Infof("adding new raft server: %s", peerName)
+ ms.Topo.HashicorpRaft.AddVoter(
+ hashicorpRaft.ServerID(peerName),
+ hashicorpRaft.ServerAddress(peerAddress.ToGrpcAddress()), 0, 0)
+ }
+ }
+ if ms.onPeerUpdatDoneCnExist {
+ ms.onPeerUpdatDoneCn <- peerName
+ }
+ } else if isLeader {
+ go func(peerName string) {
+ for {
+ select {
+ case <-time.After(RaftServerRemovalTime):
+ err := ms.MasterClient.WithClient(false, func(client master_pb.SeaweedClient) error {
+ _, err := client.RaftRemoveServer(context.Background(), &master_pb.RaftRemoveServerRequest{
+ Id: peerName,
+ Force: false,
+ })
+ return err
+ })
+ if err != nil {
+ glog.Warningf("failed to removing old raft server %s: %v", peerName, err)
+ }
+ return
+ case peerDone := <-ms.onPeerUpdatDoneCn:
+ if peerName == peerDone {
+ return
+ }
+ }
+ }
+ }(peerName)
+ ms.onPeerUpdatDoneCnExist = true
+ }
+}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 2a1f6d523..0b79c4ed5 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -113,13 +113,16 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return
}
- if ms.shouldVolumeGrow(option) {
+ vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
+
+ if !vl.HasGrowRequest() && vl.ShouldGrowVolumes(option) {
glog.V(0).Infof("dirAssign volume growth %v from %v", option.String(), r.RemoteAddr)
if ms.Topo.AvailableSpaceFor(option) <= 0 {
writeJsonQuiet(w, r, http.StatusNotFound, operation.AssignResult{Error: "No free volumes left for " + option.String()})
return
}
errCh := make(chan error, 1)
+ vl.AddGrowRequest()
ms.vgCh <- &topology.VolumeGrowRequest{
Option: option,
Count: writableVolumeCount,
@@ -130,9 +133,10 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return
}
}
- fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option)
+ fid, count, dnList, err := ms.Topo.PickForWrite(requestedCount, option)
if err == nil {
ms.maybeAddJwtAuthorization(w, fid, true)
+ dn := dnList.Head()
writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
} else {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
@@ -145,9 +149,9 @@ func (ms *MasterServer) maybeAddJwtAuthorization(w http.ResponseWriter, fileId s
}
var encodedJwt security.EncodedJwt
if isWrite {
- encodedJwt = security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId)
+ encodedJwt = security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fileId)
} else {
- encodedJwt = security.GenJwt(ms.guard.ReadSigningKey, ms.guard.ReadExpiresAfterSec, fileId)
+ encodedJwt = security.GenJwtForVolumeServer(ms.guard.ReadSigningKey, ms.guard.ReadExpiresAfterSec, fileId)
}
if encodedJwt == "" {
return
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 4a86348d9..47abfb892 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -3,6 +3,8 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
"math/rand"
"net/http"
"strconv"
@@ -26,7 +28,7 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
for _, server := range collection.ListVolumeServers() {
- err := operation.WithVolumeServerClient(server.Url(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(false, server.ServerAddress(), ms.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
_, deleteErr := client.DeleteCollection(context.Background(), &volume_server_pb.DeleteCollectionRequest{
Collection: collection.Name,
})
@@ -63,7 +65,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
}
}
// glog.Infoln("garbageThreshold =", gcThreshold)
- ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, ms.preallocateSize)
+ ms.Topo.Vacuum(ms.grpcDialOption, gcThreshold, 0, "", ms.preallocateSize)
ms.dirStatusHandler(w, r)
}
@@ -80,7 +82,9 @@ func (ms *MasterServer) volumeGrowHandler(w http.ResponseWriter, r *http.Request
if ms.Topo.AvailableSpaceFor(option) < int64(count*option.ReplicaPlacement.GetCopyCount()) {
err = fmt.Errorf("only %d volumes left, not enough for %d", ms.Topo.AvailableSpaceFor(option), count*option.ReplicaPlacement.GetCopyCount())
} else {
- count, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
+ var newVidLocations []*master_pb.VolumeLocation
+ newVidLocations, err = ms.vg.GrowByCountAndType(ms.grpcDialOption, count, option, ms.Topo)
+ count = len(newVidLocations)
}
} else {
err = fmt.Errorf("can not parse parameter count %s", r.FormValue("count"))
@@ -118,32 +122,19 @@ func (ms *MasterServer) redirectHandler(w http.ResponseWriter, r *http.Request)
}
}
-func (ms *MasterServer) selfUrl(r *http.Request) string {
- if r.Host != "" {
- return r.Host
- }
- return "localhost:" + strconv.Itoa(ms.option.Port)
-}
func (ms *MasterServer) submitFromMasterServerHandler(w http.ResponseWriter, r *http.Request) {
if ms.Topo.IsLeader() {
- submitForClientHandler(w, r, func() string { return ms.selfUrl(r) }, ms.grpcDialOption)
+ submitForClientHandler(w, r, func() pb.ServerAddress { return ms.option.Master }, ms.grpcDialOption)
} else {
masterUrl, err := ms.Topo.Leader()
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
} else {
- submitForClientHandler(w, r, func() string { return masterUrl }, ms.grpcDialOption)
+ submitForClientHandler(w, r, func() pb.ServerAddress { return masterUrl }, ms.grpcDialOption)
}
}
}
-func (ms *MasterServer) shouldVolumeGrow(option *topology.VolumeGrowOption) bool {
- vl := ms.Topo.GetVolumeLayout(option.Collection, option.ReplicaPlacement, option.Ttl, option.DiskType)
- active, high := vl.GetActiveVolumeCount(option)
- //glog.V(0).Infof("active volume: %d, high usage volume: %d\n", active, high)
- return active <= high
-}
-
func (ms *MasterServer) getVolumeGrowOption(r *http.Request) (*topology.VolumeGrowOption, error) {
replicationString := r.FormValue("replication")
if replicationString == "" {
diff --git a/weed/server/master_server_handlers_ui.go b/weed/server/master_server_handlers_ui.go
index 015bfbd00..d8260d8d2 100644
--- a/weed/server/master_server_handlers_ui.go
+++ b/weed/server/master_server_handlers_ui.go
@@ -5,6 +5,8 @@ import (
"time"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
+
ui "github.com/chrislusf/seaweedfs/weed/server/master_ui"
"github.com/chrislusf/seaweedfs/weed/stats"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -13,20 +15,40 @@ import (
func (ms *MasterServer) uiStatusHandler(w http.ResponseWriter, r *http.Request) {
infos := make(map[string]interface{})
infos["Up Time"] = time.Now().Sub(startTime).String()
- args := struct {
- Version string
- Topology interface{}
- RaftServer raft.Server
- Stats map[string]interface{}
- Counters *stats.ServerStats
- VolumeSizeLimitMB uint32
- }{
- util.Version(),
- ms.Topo.ToMap(),
- ms.Topo.RaftServer,
- infos,
- serverStats,
- ms.option.VolumeSizeLimitMB,
+ infos["Max Volume Id"] = ms.Topo.GetMaxVolumeId()
+ if ms.Topo.RaftServer != nil {
+ args := struct {
+ Version string
+ Topology interface{}
+ RaftServer raft.Server
+ Stats map[string]interface{}
+ Counters *stats.ServerStats
+ VolumeSizeLimitMB uint32
+ }{
+ util.Version(),
+ ms.Topo.ToMap(),
+ ms.Topo.RaftServer,
+ infos,
+ serverStats,
+ ms.option.VolumeSizeLimitMB,
+ }
+ ui.StatusTpl.Execute(w, args)
+ } else if ms.Topo.HashicorpRaft != nil {
+ args := struct {
+ Version string
+ Topology interface{}
+ RaftServer *hashicorpRaft.Raft
+ Stats map[string]interface{}
+ Counters *stats.ServerStats
+ VolumeSizeLimitMB uint32
+ }{
+ util.Version(),
+ ms.Topo.ToMap(),
+ ms.Topo.HashicorpRaft,
+ infos,
+ serverStats,
+ ms.option.VolumeSizeLimitMB,
+ }
+ ui.StatusNewRaftTpl.Execute(w, args)
}
- ui.StatusTpl.Execute(w, args)
}
diff --git a/weed/server/master_ui/masterNewRaft.html b/weed/server/master_ui/masterNewRaft.html
new file mode 100644
index 000000000..32afdceac
--- /dev/null
+++ b/weed/server/master_ui/masterNewRaft.html
@@ -0,0 +1,121 @@
+<!DOCTYPE html>
+<html>
+<head>
+ <title>SeaweedFS {{ .Version }}</title>
+ <link rel="stylesheet" href="/seaweedfsstatic/bootstrap/3.3.1/css/bootstrap.min.css">
+</head>
+<body>
+<div class="container">
+ <div class="page-header">
+ <h1>
+ <a href="https://github.com/chrislusf/seaweedfs"><img src="/seaweedfsstatic/seaweed50x50.png"></img></a>
+ SeaweedFS <small>{{ .Version }}</small>
+ </h1>
+ </div>
+
+ <div class="row">
+ <div class="col-sm-6">
+ <h2>Cluster status</h2>
+ <table class="table table-condensed table-striped">
+ <tbody>
+ <tr>
+ <th>Volume Size Limit</th>
+ <td>{{ .VolumeSizeLimitMB }}MB</td>
+ </tr>
+ <tr>
+ <th>Free</th>
+ <td>{{ .Topology.Free }}</td>
+ </tr>
+ <tr>
+ <th>Max</th>
+ <td>{{ .Topology.Max }}</td>
+ </tr>
+ {{ with .RaftServer }}
+ <tr>
+ <th>Leader</th>
+ <td><a href="http://{{ .Leader }}">{{ .Leader }}</a></td>
+ </tr>
+ <tr>
+ <th>Other Masters</th>
+ <td class="col-sm-5">
+ <ul class="list-unstyled">
+ {{ range $k, $p := .GetConfiguration.Configuration.Servers }}
+ <li><a href="http://{{ $p.ID }}/ui/index.html">{{ $p.ID }}</a></li>
+ {{ end }}
+ </ul>
+ </td>
+ </tr>
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+ <div class="col-sm-6">
+ <h2>System Stats</h2>
+ <table class="table table-condensed table-striped">
+ <tr>
+ <th>Concurrent Connections</th>
+ <td>{{ .Counters.Connections.WeekCounter.Sum }}</td>
+ </tr>
+ {{ range $key, $val := .Stats }}
+ <tr>
+ <th>{{ $key }}</th>
+ <td>{{ $val }}</td>
+ </tr>
+ {{ end }}
+ </table>
+ <h2>Raft Stats</h2>
+ <table class="table table-condensed table-striped">
+ <tr>
+ <th>applied_index</th>
+ <td>{{ .RaftServer.Stats.applied_index }}</td>
+ </tr>
+ <tr>
+ <th>last_log_term</th>
+ <td>{{ .RaftServer.Stats.last_log_term }}</td>
+ </tr>
+ </table>
+ </div>
+ </div>
+
+ <div class="row">
+ <h2>Topology</h2>
+ <table class="table table-striped">
+ <thead>
+ <tr>
+ <th>Data Center</th>
+ <th>Rack</th>
+ <th>RemoteAddr</th>
+ <th>#Volumes</th>
+ <th>Volume Ids</th>
+ <th>#ErasureCodingShards</th>
+ <th>Max</th>
+ </tr>
+ </thead>
+ <tbody>
+ {{ range $dc_index, $dc := .Topology.DataCenters }}
+ {{ range $rack_index, $rack := $dc.Racks }}
+ {{ range $dn_index, $dn := $rack.DataNodes }}
+ <tr>
+ <td><code>{{ $dc.Id }}</code></td>
+ <td>{{ $rack.Id }}</td>
+ <td><a href="http://{{ $dn.Url }}/ui/index.html">{{ $dn.Url }}</a>
+ {{ if ne $dn.PublicUrl $dn.Url }}
+ / <a href="http://{{ $dn.PublicUrl }}/ui/index.html">{{ $dn.PublicUrl }}</a>
+ {{ end }}
+ </td>
+ <td>{{ $dn.Volumes }}</td>
+ <td>{{ $dn.VolumeIds}}</td>
+ <td>{{ $dn.EcShards }}</td>
+ <td>{{ $dn.Max }}</td>
+ </tr>
+ {{ end }}
+ {{ end }}
+ {{ end }}
+ </tbody>
+ </table>
+ </div>
+
+</div>
+</body>
+</html>
diff --git a/weed/server/master_ui/templates.go b/weed/server/master_ui/templates.go
index 415022b97..a6dcc57d7 100644
--- a/weed/server/master_ui/templates.go
+++ b/weed/server/master_ui/templates.go
@@ -8,4 +8,8 @@ import (
//go:embed master.html
var masterHtml string
+//go:embed masterNewRaft.html
+var masterNewRaftHtml string
+
var StatusTpl = template.Must(template.New("status").Parse(masterHtml))
+var StatusNewRaftTpl = template.Must(template.New("status").Parse(masterNewRaftHtml))
diff --git a/weed/server/raft_hashicorp.go b/weed/server/raft_hashicorp.go
new file mode 100644
index 000000000..9971eaa48
--- /dev/null
+++ b/weed/server/raft_hashicorp.go
@@ -0,0 +1,186 @@
+package weed_server
+
+// https://yusufs.medium.com/creating-distributed-kv-database-by-implementing-raft-consensus-using-golang-d0884eef2e28
+// https://github.com/Jille/raft-grpc-example/blob/cd5bcab0218f008e044fbeee4facdd01b06018ad/application.go#L18
+
+import (
+ "fmt"
+ transport "github.com/Jille/raft-grpc-transport"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/hashicorp/raft"
+ boltdb "github.com/hashicorp/raft-boltdb"
+ "google.golang.org/grpc"
+ "math/rand"
+ "os"
+ "path"
+ "path/filepath"
+ "sort"
+ "strings"
+ "time"
+)
+
+const (
+ ldbFile = "logs.dat"
+ sdbFile = "stable.dat"
+ updatePeersTimeout = 15 * time.Minute
+)
+
+func getPeerIdx(self pb.ServerAddress, mapPeers map[string]pb.ServerAddress) int {
+ peers := make([]pb.ServerAddress, 0, len(mapPeers))
+ for _, peer := range mapPeers {
+ peers = append(peers, peer)
+ }
+ sort.Slice(peers, func(i, j int) bool {
+ return strings.Compare(string(peers[i]), string(peers[j])) < 0
+ })
+ for i, peer := range peers {
+ if string(peer) == string(self) {
+ return i
+ }
+ }
+ return -1
+}
+
+func (s *RaftServer) AddPeersConfiguration() (cfg raft.Configuration) {
+ for _, peer := range s.peers {
+ cfg.Servers = append(cfg.Servers, raft.Server{
+ Suffrage: raft.Voter,
+ ID: raft.ServerID(peer),
+ Address: raft.ServerAddress(peer.ToGrpcAddress()),
+ })
+ }
+ return cfg
+}
+
+func (s *RaftServer) UpdatePeers() {
+ for {
+ select {
+ case isLeader := <-s.RaftHashicorp.LeaderCh():
+ if isLeader {
+ peerLeader := string(s.serverAddr)
+ existsPeerName := make(map[string]bool)
+ for _, server := range s.RaftHashicorp.GetConfiguration().Configuration().Servers {
+ if string(server.ID) == peerLeader {
+ continue
+ }
+ existsPeerName[string(server.ID)] = true
+ }
+ for _, peer := range s.peers {
+ peerName := string(peer)
+ if peerName == peerLeader || existsPeerName[peerName] {
+ continue
+ }
+ glog.V(0).Infof("adding new peer: %s", peerName)
+ s.RaftHashicorp.AddVoter(
+ raft.ServerID(peerName), raft.ServerAddress(peer.ToGrpcAddress()), 0, 0)
+ }
+ for peer, _ := range existsPeerName {
+ if _, found := s.peers[peer]; !found {
+ glog.V(0).Infof("removing old peer: %s", peer)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peer), 0, 0)
+ }
+ }
+ if _, found := s.peers[peerLeader]; !found {
+ glog.V(0).Infof("removing old leader peer: %s", peerLeader)
+ s.RaftHashicorp.RemoveServer(raft.ServerID(peerLeader), 0, 0)
+ }
+ }
+ return
+ case <-time.After(updatePeersTimeout):
+ return
+ }
+ }
+}
+
+func NewHashicorpRaftServer(option *RaftServerOption) (*RaftServer, error) {
+ s := &RaftServer{
+ peers: option.Peers,
+ serverAddr: option.ServerAddr,
+ dataDir: option.DataDir,
+ topo: option.Topo,
+ }
+
+ c := raft.DefaultConfig()
+ c.LocalID = raft.ServerID(s.serverAddr) // TODO maybee the IP:port address will change
+ c.HeartbeatTimeout = time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
+ c.ElectionTimeout = option.ElectionTimeout
+ if c.LeaderLeaseTimeout > c.HeartbeatTimeout {
+ c.LeaderLeaseTimeout = c.HeartbeatTimeout
+ }
+ if glog.V(4) {
+ c.LogLevel = "Debug"
+ } else if glog.V(2) {
+ c.LogLevel = "Info"
+ } else if glog.V(1) {
+ c.LogLevel = "Warn"
+ } else if glog.V(0) {
+ c.LogLevel = "Error"
+ }
+
+ if option.RaftBootstrap {
+ os.RemoveAll(path.Join(s.dataDir, ldbFile))
+ os.RemoveAll(path.Join(s.dataDir, sdbFile))
+ os.RemoveAll(path.Join(s.dataDir, "snapshots"))
+ }
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshots"), os.ModePerm); err != nil {
+ return nil, err
+ }
+ baseDir := s.dataDir
+
+ ldb, err := boltdb.NewBoltStore(filepath.Join(baseDir, ldbFile))
+ if err != nil {
+ return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "logs.dat"), err)
+ }
+
+ sdb, err := boltdb.NewBoltStore(filepath.Join(baseDir, sdbFile))
+ if err != nil {
+ return nil, fmt.Errorf(`boltdb.NewBoltStore(%q): %v`, filepath.Join(baseDir, "stable.dat"), err)
+ }
+
+ fss, err := raft.NewFileSnapshotStore(baseDir, 3, os.Stderr)
+ if err != nil {
+ return nil, fmt.Errorf(`raft.NewFileSnapshotStore(%q, ...): %v`, baseDir, err)
+ }
+
+ s.TransportManager = transport.New(raft.ServerAddress(s.serverAddr), []grpc.DialOption{option.GrpcDialOption})
+
+ stateMachine := StateMachine{topo: option.Topo}
+ s.RaftHashicorp, err = raft.NewRaft(c, &stateMachine, ldb, sdb, fss, s.TransportManager.Transport())
+ if err != nil {
+ return nil, fmt.Errorf("raft.NewRaft: %v", err)
+ }
+ if option.RaftBootstrap || len(s.RaftHashicorp.GetConfiguration().Configuration().Servers) == 0 {
+ cfg := s.AddPeersConfiguration()
+ // Need to get lock, in case all servers do this at the same time.
+ peerIdx := getPeerIdx(s.serverAddr, s.peers)
+ timeSpeep := time.Duration(float64(c.LeaderLeaseTimeout) * (rand.Float64()*0.25 + 1) * float64(peerIdx))
+ glog.V(0).Infof("Bootstrapping idx: %d sleep: %v new cluster: %+v", peerIdx, timeSpeep, cfg)
+ time.Sleep(timeSpeep)
+ f := s.RaftHashicorp.BootstrapCluster(cfg)
+ if err := f.Error(); err != nil {
+ return nil, fmt.Errorf("raft.Raft.BootstrapCluster: %v", err)
+ }
+ } else {
+ go s.UpdatePeers()
+ }
+
+ ticker := time.NewTicker(c.HeartbeatTimeout * 10)
+ if glog.V(4) {
+ go func() {
+ for {
+ select {
+ case <-ticker.C:
+ cfuture := s.RaftHashicorp.GetConfiguration()
+ if err = cfuture.Error(); err != nil {
+ glog.Fatalf("error getting config: %s", err)
+ }
+ configuration := cfuture.Configuration()
+ glog.V(4).Infof("Showing peers known by %s:\n%+v", s.RaftHashicorp.String(), configuration.Servers)
+ }
+ }
+ }()
+ }
+
+ return s, nil
+}
diff --git a/weed/server/raft_server.go b/weed/server/raft_server.go
index 85841e409..ad0a1c8ce 100644
--- a/weed/server/raft_server.go
+++ b/weed/server/raft_server.go
@@ -2,10 +2,12 @@ package weed_server
import (
"encoding/json"
+ transport "github.com/Jille/raft-grpc-transport"
+ "io"
+ "io/ioutil"
"math/rand"
"os"
"path"
- "sort"
"time"
"google.golang.org/grpc"
@@ -13,17 +15,32 @@ import (
"github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/raft"
+ hashicorpRaft "github.com/hashicorp/raft"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/topology"
)
+type RaftServerOption struct {
+ GrpcDialOption grpc.DialOption
+ Peers map[string]pb.ServerAddress
+ ServerAddr pb.ServerAddress
+ DataDir string
+ Topo *topology.Topology
+ RaftResumeState bool
+ HeartbeatInterval time.Duration
+ ElectionTimeout time.Duration
+ RaftBootstrap bool
+}
+
type RaftServer struct {
- peers []string // initial peers to join with
- raftServer raft.Server
- dataDir string
- serverAddr string
- topo *topology.Topology
+ peers map[string]pb.ServerAddress // initial peers to join with
+ raftServer raft.Server
+ RaftHashicorp *hashicorpRaft.Raft
+ TransportManager *transport.Manager
+ dataDir string
+ serverAddr pb.ServerAddress
+ topo *topology.Topology
*raft.GrpcServer
}
@@ -32,6 +49,8 @@ type StateMachine struct {
topo *topology.Topology
}
+var _ hashicorpRaft.FSM = &StateMachine{}
+
func (s StateMachine) Save() ([]byte, error) {
state := topology.MaxVolumeIdCommand{
MaxVolumeId: s.topo.GetMaxVolumeId(),
@@ -51,12 +70,42 @@ func (s StateMachine) Recovery(data []byte) error {
return nil
}
-func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, dataDir string, topo *topology.Topology, raftResumeState bool) (*RaftServer, error) {
+func (s *StateMachine) Apply(l *hashicorpRaft.Log) interface{} {
+ before := s.topo.GetMaxVolumeId()
+ state := topology.MaxVolumeIdCommand{}
+ err := json.Unmarshal(l.Data, &state)
+ if err != nil {
+ return err
+ }
+ s.topo.UpAdjustMaxVolumeId(state.MaxVolumeId)
+
+ glog.V(1).Infoln("max volume id", before, "==>", s.topo.GetMaxVolumeId())
+ return nil
+}
+
+func (s *StateMachine) Snapshot() (hashicorpRaft.FSMSnapshot, error) {
+ return &topology.MaxVolumeIdCommand{
+ MaxVolumeId: s.topo.GetMaxVolumeId(),
+ }, nil
+}
+
+func (s *StateMachine) Restore(r io.ReadCloser) error {
+ b, err := ioutil.ReadAll(r)
+ if err != nil {
+ return err
+ }
+ if err := s.Recovery(b); err != nil {
+ return err
+ }
+ return nil
+}
+
+func NewRaftServer(option *RaftServerOption) (*RaftServer, error) {
s := &RaftServer{
- peers: peers,
- serverAddr: serverAddr,
- dataDir: dataDir,
- topo: topo,
+ peers: option.Peers,
+ serverAddr: option.ServerAddr,
+ dataDir: option.DataDir,
+ topo: option.Topo,
}
if glog.V(4) {
@@ -66,27 +115,29 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
raft.RegisterCommand(&topology.MaxVolumeIdCommand{})
var err error
- transporter := raft.NewGrpcTransporter(grpcDialOption)
- glog.V(0).Infof("Starting RaftServer with %v", serverAddr)
+ transporter := raft.NewGrpcTransporter(option.GrpcDialOption)
+ glog.V(0).Infof("Starting RaftServer with %v", option.ServerAddr)
- if !raftResumeState {
+ // always clear previous log to avoid server is promotable
+ os.RemoveAll(path.Join(s.dataDir, "log"))
+ if !option.RaftResumeState {
// always clear previous metadata
os.RemoveAll(path.Join(s.dataDir, "conf"))
- os.RemoveAll(path.Join(s.dataDir, "log"))
os.RemoveAll(path.Join(s.dataDir, "snapshot"))
}
- if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), 0600); err != nil {
+ if err := os.MkdirAll(path.Join(s.dataDir, "snapshot"), os.ModePerm); err != nil {
return nil, err
}
- stateMachine := StateMachine{topo: topo}
- s.raftServer, err = raft.NewServer(s.serverAddr, s.dataDir, transporter, stateMachine, topo, "")
+ stateMachine := StateMachine{topo: option.Topo}
+ s.raftServer, err = raft.NewServer(string(s.serverAddr), s.dataDir, transporter, stateMachine, option.Topo, "")
if err != nil {
glog.V(0).Infoln(err)
return nil, err
}
- s.raftServer.SetHeartbeatInterval(time.Duration(300+rand.Intn(150)) * time.Millisecond)
- s.raftServer.SetElectionTimeout(10 * time.Second)
+ heartbeatInterval := time.Duration(float64(option.HeartbeatInterval) * (rand.Float64()*0.25 + 1))
+ s.raftServer.SetHeartbeatInterval(heartbeatInterval)
+ s.raftServer.SetElectionTimeout(option.ElectionTimeout)
if err := s.raftServer.LoadSnapshot(); err != nil {
return nil, err
}
@@ -94,68 +145,53 @@ func NewRaftServer(grpcDialOption grpc.DialOption, peers []string, serverAddr, d
return nil, err
}
- for _, peer := range s.peers {
- if err := s.raftServer.AddPeer(peer, pb.ServerToGrpcAddress(peer)); err != nil {
+ for name, peer := range s.peers {
+ if err := s.raftServer.AddPeer(name, peer.ToGrpcAddress()); err != nil {
return nil, err
}
}
// Remove deleted peers
for existsPeerName := range s.raftServer.Peers() {
- exists, existingPeer := false, ""
- for _, peer := range s.peers {
- if pb.ServerToGrpcAddress(peer) == existsPeerName {
- exists, existingPeer = true, peer
- break
- }
- }
- if exists {
+ if existingPeer, found := s.peers[existsPeerName]; !found {
if err := s.raftServer.RemovePeer(existsPeerName); err != nil {
glog.V(0).Infoln(err)
return nil, err
} else {
- glog.V(0).Infof("removing old peer %s", existingPeer)
+ glog.V(0).Infof("removing old peer: %s", existingPeer)
}
}
}
s.GrpcServer = raft.NewGrpcServer(s.raftServer)
- if s.raftServer.IsLogEmpty() && isTheFirstOne(serverAddr, s.peers) {
- // Initialize the server by joining itself.
- // s.DoJoinCommand()
- }
-
glog.V(0).Infof("current cluster leader: %v", s.raftServer.Leader())
return s, nil
}
func (s *RaftServer) Peers() (members []string) {
- peers := s.raftServer.Peers()
-
- for _, p := range peers {
- members = append(members, p.Name)
+ if s.raftServer != nil {
+ peers := s.raftServer.Peers()
+ for _, p := range peers {
+ members = append(members, p.Name)
+ }
+ } else if s.RaftHashicorp != nil {
+ cfg := s.RaftHashicorp.GetConfiguration()
+ for _, p := range cfg.Configuration().Servers {
+ members = append(members, string(p.ID))
+ }
}
-
return
}
-func isTheFirstOne(self string, peers []string) bool {
- sort.Strings(peers)
- if len(peers) <= 0 {
- return true
- }
- return self == peers[0]
-}
-
func (s *RaftServer) DoJoinCommand() {
glog.V(0).Infoln("Initializing new cluster")
if _, err := s.raftServer.Do(&raft.DefaultJoinCommand{
Name: s.raftServer.Name(),
- ConnectionString: pb.ServerToGrpcAddress(s.serverAddr),
+ ConnectionString: s.serverAddr.ToGrpcAddress(),
}); err != nil {
glog.Errorf("fail to send join command: %v", err)
}
diff --git a/weed/server/raft_server_handlers.go b/weed/server/raft_server_handlers.go
index 252570eab..cc3e6e37f 100644
--- a/weed/server/raft_server_handlers.go
+++ b/weed/server/raft_server_handlers.go
@@ -1,15 +1,16 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"net/http"
)
type ClusterStatusResult struct {
- IsLeader bool `json:"IsLeader,omitempty"`
- Leader string `json:"Leader,omitempty"`
- Peers []string `json:"Peers,omitempty"`
- MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"`
+ IsLeader bool `json:"IsLeader,omitempty"`
+ Leader pb.ServerAddress `json:"Leader,omitempty"`
+ Peers []string `json:"Peers,omitempty"`
+ MaxVolumeId needle.VolumeId `json:"MaxVolumeId,omitempty"`
}
func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
@@ -24,3 +25,11 @@ func (s *RaftServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
}
writeJsonQuiet(w, r, http.StatusOK, ret)
}
+
+func (s *RaftServer) StatsRaftHandler(w http.ResponseWriter, r *http.Request) {
+ if s.RaftHashicorp == nil {
+ writeJsonQuiet(w, r, http.StatusNotFound, nil)
+ return
+ }
+ writeJsonQuiet(w, r, http.StatusOK, s.RaftHashicorp.Stats())
+}
diff --git a/weed/server/volume_grpc_admin.go b/weed/server/volume_grpc_admin.go
index 898c3da12..2ffdf2226 100644
--- a/weed/server/volume_grpc_admin.go
+++ b/weed/server/volume_grpc_admin.go
@@ -3,7 +3,13 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/cluster"
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
"path/filepath"
+ "time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
@@ -183,7 +189,12 @@ func (vs *VolumeServer) VolumeStatus(ctx context.Context, req *volume_server_pb.
func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_server_pb.VolumeServerStatusRequest) (*volume_server_pb.VolumeServerStatusResponse, error) {
- resp := &volume_server_pb.VolumeServerStatusResponse{}
+ resp := &volume_server_pb.VolumeServerStatusResponse{
+ MemoryStatus: stats.MemStat(),
+ Version: util.Version(),
+ DataCenter: vs.dataCenter,
+ Rack: vs.rack,
+ }
for _, loc := range vs.store.Locations {
if dir, e := filepath.Abs(loc.Directory); e == nil {
@@ -191,8 +202,6 @@ func (vs *VolumeServer) VolumeServerStatus(ctx context.Context, req *volume_serv
}
}
- resp.MemoryStatus = stats.MemStat()
-
return resp, nil
}
@@ -247,3 +256,41 @@ func (vs *VolumeServer) VolumeNeedleStatus(ctx context.Context, req *volume_serv
return resp, nil
}
+
+func (vs *VolumeServer) Ping(ctx context.Context, req *volume_server_pb.PingRequest) (resp *volume_server_pb.PingResponse, pingErr error) {
+ resp = &volume_server_pb.PingResponse{
+ StartTimeNs: time.Now().UnixNano(),
+ }
+ if req.TargetType == cluster.FilerType {
+ pingErr = pb.WithFilerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ pingResp, err := client.Ping(ctx, &filer_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.VolumeServerType {
+ pingErr = pb.WithVolumeServerClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ pingResp, err := client.Ping(ctx, &volume_server_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if req.TargetType == cluster.MasterType {
+ pingErr = pb.WithMasterClient(false, pb.ServerAddress(req.Target), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ pingResp, err := client.Ping(ctx, &master_pb.PingRequest{})
+ if pingResp != nil {
+ resp.RemoteTimeNs = pingResp.StartTimeNs
+ }
+ return err
+ })
+ }
+ if pingErr != nil {
+ pingErr = fmt.Errorf("ping %s %s: %v", req.TargetType, req.Target, pingErr)
+ }
+ resp.StopTimeNs = time.Now().UnixNano()
+ return
+}
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index f8875169f..078b78eb2 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -2,9 +2,11 @@ package weed_server
import (
"fmt"
- "github.com/chrislusf/seaweedfs/weed/operation"
+ "os"
"time"
+ "github.com/chrislusf/seaweedfs/weed/operation"
+
"google.golang.org/grpc"
"github.com/chrislusf/seaweedfs/weed/pb"
@@ -19,15 +21,14 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (vs *VolumeServer) GetMaster() string {
+func (vs *VolumeServer) GetMaster() pb.ServerAddress {
return vs.currentMaster
}
func (vs *VolumeServer) checkWithMaster() (err error) {
- isConnected := false
- for !isConnected {
+ for {
for _, master := range vs.SeedMasterNodes {
- err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
+ err = operation.WithMasterServerClient(false, master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
if err != nil {
return fmt.Errorf("get master %s configuration: %v", master, err)
@@ -44,7 +45,6 @@ func (vs *VolumeServer) checkWithMaster() (err error) {
}
time.Sleep(1790 * time.Millisecond)
}
- return
}
func (vs *VolumeServer) heartbeat() {
@@ -56,7 +56,7 @@ func (vs *VolumeServer) heartbeat() {
grpcDialOption := security.LoadClientTLS(util.GetViper(), "grpc.volume")
var err error
- var newLeader string
+ var newLeader pb.ServerAddress
for vs.isHeartbeating {
for _, master := range vs.SeedMasterNodes {
if newLeader != "" {
@@ -65,13 +65,8 @@ func (vs *VolumeServer) heartbeat() {
time.Sleep(3 * time.Second)
master = newLeader
}
- masterGrpcAddress, parseErr := pb.ParseServerToGrpcAddress(master)
- if parseErr != nil {
- glog.V(0).Infof("failed to parse master grpc %v: %v", masterGrpcAddress, parseErr)
- continue
- }
vs.store.MasterAddress = master
- newLeader, err = vs.doHeartbeat(master, masterGrpcAddress, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
+ newLeader, err = vs.doHeartbeat(master, grpcDialOption, time.Duration(vs.pulseSeconds)*time.Second)
if err != nil {
glog.V(0).Infof("heartbeat error: %v", err)
time.Sleep(time.Duration(vs.pulseSeconds) * time.Second)
@@ -94,25 +89,25 @@ func (vs *VolumeServer) StopHeartbeat() (isAlreadyStopping bool) {
return false
}
-func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader string, err error) {
+func (vs *VolumeServer) doHeartbeat(masterAddress pb.ServerAddress, grpcDialOption grpc.DialOption, sleepInterval time.Duration) (newLeader pb.ServerAddress, err error) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
- grpcConection, err := pb.GrpcDial(ctx, masterGrpcAddress, grpcDialOption)
+ grpcConection, err := pb.GrpcDial(ctx, masterAddress.ToGrpcAddress(), grpcDialOption)
if err != nil {
- return "", fmt.Errorf("fail to dial %s : %v", masterNode, err)
+ return "", fmt.Errorf("fail to dial %s : %v", masterAddress, err)
}
defer grpcConection.Close()
client := master_pb.NewSeaweedClient(grpcConection)
stream, err := client.SendHeartbeat(ctx)
if err != nil {
- glog.V(0).Infof("SendHeartbeat to %s: %v", masterNode, err)
+ glog.V(0).Infof("SendHeartbeat to %s: %v", masterAddress, err)
return "", err
}
- glog.V(0).Infof("Heartbeat to: %v", masterNode)
- vs.currentMaster = masterNode
+ glog.V(0).Infof("Heartbeat to: %v", masterAddress)
+ vs.currentMaster = masterAddress
doneChan := make(chan error, 1)
@@ -123,17 +118,30 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
doneChan <- err
return
}
+ if len(in.DuplicatedUuids) > 0 {
+ var duplicateDir []string
+ for _, loc := range vs.store.Locations {
+ for _, uuid := range in.DuplicatedUuids {
+ if uuid == loc.DirectoryUuid {
+ duplicateDir = append(duplicateDir, loc.Directory)
+ }
+ }
+ }
+ glog.Errorf("Shut down Volume Server due to duplicate volume directories: %v", duplicateDir)
+ os.Exit(1)
+ }
if in.GetVolumeSizeLimit() != 0 && vs.store.GetVolumeSizeLimit() != in.GetVolumeSizeLimit() {
vs.store.SetVolumeSizeLimit(in.GetVolumeSizeLimit())
if vs.store.MaybeAdjustVolumeMax() {
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
+ return
}
}
}
- if in.GetLeader() != "" && vs.currentMaster != in.GetLeader() {
+ if in.GetLeader() != "" && string(vs.currentMaster) != in.GetLeader() {
glog.V(0).Infof("Volume Server found a new master newLeader: %v instead of %v", in.GetLeader(), vs.currentMaster)
- newLeader = in.GetLeader()
+ newLeader = pb.ServerAddress(in.GetLeader())
doneChan <- nil
return
}
@@ -141,12 +149,12 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}()
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
@@ -161,9 +169,9 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
&volumeMessage,
},
}
- glog.V(1).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
+ glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
case ecShardMessage := <-vs.store.NewEcShardsChan:
@@ -172,10 +180,10 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
&ecShardMessage,
},
}
- glog.V(1).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
+ glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
case volumeMessage := <-vs.store.DeletedVolumesChan:
@@ -184,9 +192,9 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
&volumeMessage,
},
}
- glog.V(1).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
+ glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
case ecShardMessage := <-vs.store.DeletedEcShardsChan:
@@ -195,23 +203,23 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
&ecShardMessage,
},
}
- glog.V(1).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
+ glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
case <-volumeTickChan:
glog.V(4).Infof("volume server %s:%d heartbeat", vs.store.Ip, vs.store.Port)
vs.store.MaybeAdjustVolumeMax()
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
case <-ecShardTickChan:
glog.V(4).Infof("volume server %s:%d ec heartbeat", vs.store.Ip, vs.store.Port)
if err = stream.Send(vs.store.CollectErasureCodingHeartbeat()); err != nil {
- glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", masterAddress, err)
return "", err
}
case err = <-doneChan:
@@ -230,7 +238,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
}
glog.V(1).Infof("volume server %s:%d stops and deletes all volumes", vs.store.Ip, vs.store.Port)
if err = stream.Send(emptyBeat); err != nil {
- glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
+ glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterAddress, err)
return "", err
}
return
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 2ad77a7ff..b4bc850e2 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -3,26 +3,28 @@ package weed_server
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/backend"
"io"
- "io/ioutil"
"math"
"os"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"github.com/chrislusf/seaweedfs/weed/util"
)
const BufferSizeLimit = 1024 * 1024 * 2
// VolumeCopy copy the .idx .dat .vif files, and mount the volume
-func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.VolumeCopyRequest) (*volume_server_pb.VolumeCopyResponse, error) {
+func (vs *VolumeServer) VolumeCopy(req *volume_server_pb.VolumeCopyRequest, stream volume_server_pb.VolumeServer_VolumeCopyServer) error {
v := vs.store.GetVolume(needle.VolumeId(req.VolumeId))
if v != nil {
@@ -31,7 +33,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
err := vs.store.DeleteVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- return nil, fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
+ return fmt.Errorf("failed to delete existing volume %d: %v", req.VolumeId, err)
}
glog.V(0).Infof("deleted existing volume %d before copying.", req.VolumeId)
@@ -45,7 +47,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
// confirm size and timestamp
var volFileInfoResp *volume_server_pb.ReadVolumeFileStatusResponse
var dataBaseFileName, indexBaseFileName, idxFileName, datFileName string
- err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
var err error
volFileInfoResp, err = client.ReadVolumeFileStatus(context.Background(),
&volume_server_pb.ReadVolumeFileStatusRequest{
@@ -67,7 +69,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
dataBaseFileName = storage.VolumeFileName(location.Directory, volFileInfoResp.Collection, int(req.VolumeId))
indexBaseFileName = storage.VolumeFileName(location.IdxDirectory, volFileInfoResp.Collection, int(req.VolumeId))
- ioutil.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
+ util.WriteFile(dataBaseFileName+".note", []byte(fmt.Sprintf("copying from %s", req.SourceDataNode)), 0755)
defer func() {
if err != nil {
@@ -78,18 +80,66 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}
}()
+ var preallocateSize int64
+ if grpcErr := pb.WithMasterClient(false, vs.GetMaster(), vs.grpcDialOption, func(client master_pb.SeaweedClient) error {
+ resp, err := client.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
+ if err != nil {
+ return fmt.Errorf("get master %s configuration: %v", vs.GetMaster(), err)
+ }
+ if resp.VolumePreallocate {
+ preallocateSize = int64(resp.VolumeSizeLimitMB) * (1 << 20)
+ }
+ return nil
+ }); grpcErr != nil {
+ glog.V(0).Infof("connect to %s: %v", vs.GetMaster(), grpcErr)
+ }
+
+ if preallocateSize > 0 {
+ volumeFile := dataBaseFileName + ".dat"
+ _, err := backend.CreateVolumeFile(volumeFile, preallocateSize, 0)
+ if err != nil {
+ return fmt.Errorf("create volume file %s: %v", volumeFile, err)
+ }
+ }
+
// println("source:", volFileInfoResp.String())
- if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
+ copyResponse := &volume_server_pb.VolumeCopyResponse{}
+ reportInterval := int64(1024 * 1024 * 128)
+ nextReportTarget := reportInterval
+ var modifiedTsNs int64
+ var sendErr error
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true, func(processed int64) bool {
+ if processed > nextReportTarget {
+ copyResponse.ProcessedBytes = processed
+ if sendErr = stream.Send(copyResponse); sendErr != nil {
+ return false
+ }
+ nextReportTarget = processed + reportInterval
+ }
+ return true
+ }); err != nil {
return err
}
+ if sendErr != nil {
+ return sendErr
+ }
+ if modifiedTsNs > 0 {
+ os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
+ }
- if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false, nil); err != nil {
return err
}
+ if modifiedTsNs > 0 {
+ os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
+ }
- if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true, nil); err != nil {
return err
}
+ if modifiedTsNs > 0 {
+ os.Chtimes(dataBaseFileName+".vif", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
+ }
os.Remove(dataBaseFileName + ".note")
@@ -97,10 +147,10 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
})
if err != nil {
- return nil, err
+ return err
}
if dataBaseFileName == "" {
- return nil, fmt.Errorf("not found volume %d file", req.VolumeId)
+ return fmt.Errorf("not found volume %d file", req.VolumeId)
}
idxFileName = indexBaseFileName + ".idx"
@@ -115,21 +165,25 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}()
if err = checkCopyFiles(volFileInfoResp, idxFileName, datFileName); err != nil { // added by panyc16
- return nil, err
+ return err
}
// mount the volume
err = vs.store.MountVolume(needle.VolumeId(req.VolumeId))
if err != nil {
- return nil, fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
+ return fmt.Errorf("failed to mount volume %d: %v", req.VolumeId, err)
}
- return &volume_server_pb.VolumeCopyResponse{
+ if err = stream.Send(&volume_server_pb.VolumeCopyResponse{
LastAppendAtNs: volFileInfoResp.DatFileTimestampSeconds * uint64(time.Second),
- }, err
+ }); err != nil {
+ glog.Errorf("send response: %v", err)
+ }
+
+ return err
}
-func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error {
+func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
@@ -141,15 +195,15 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
})
if err != nil {
- return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
+ return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
}
- err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
+ modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend, progressFn)
if err != nil {
- return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
+ return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
}
- return nil
+ return modifiedTsNs, nil
}
@@ -178,7 +232,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse
return nil
}
-func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error {
+func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool, progressFn storage.ProgressFunc) (modifiedTsNs int64, err error) {
glog.V(4).Infof("writing to %s", fileName)
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if isAppend {
@@ -186,22 +240,32 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
}
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
- return nil
+ return modifiedTsNs, nil
}
defer dst.Close()
+ var progressedBytes int64
for {
resp, receiveErr := client.Recv()
if receiveErr == io.EOF {
break
}
+ if resp != nil && resp.ModifiedTsNs != 0 {
+ modifiedTsNs = resp.ModifiedTsNs
+ }
if receiveErr != nil {
- return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
+ return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr)
}
dst.Write(resp.FileContent)
+ progressedBytes += int64(len(resp.FileContent))
+ if progressFn != nil {
+ if !progressFn(progressedBytes) {
+ return modifiedTsNs, fmt.Errorf("interrupted copy operation")
+ }
+ }
wt.MaybeSlowdown(int64(len(resp.FileContent)))
}
- return nil
+ return modifiedTsNs, nil
}
func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
@@ -239,6 +303,7 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
if uint32(v.CompactionRevision) != req.CompactionRevision && req.CompactionRevision != math.MaxUint32 {
return fmt.Errorf("volume %d is compacted", req.VolumeId)
}
+ v.SyncToDisk()
fileName = v.FileName(req.Ext)
} else {
baseFileName := erasure_coding.EcShardBaseFileName(req.Collection, int(req.VolumeId)) + req.Ext
@@ -271,6 +336,12 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
}
defer file.Close()
+ fileInfo, err := file.Stat()
+ if err != nil {
+ return err
+ }
+ fileModTsNs := fileInfo.ModTime().UnixNano()
+
buffer := make([]byte, BufferSizeLimit)
for bytesToRead > 0 {
@@ -290,12 +361,14 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
bytesread = int(bytesToRead)
}
err = stream.Send(&volume_server_pb.CopyFileResponse{
- FileContent: buffer[:bytesread],
+ FileContent: buffer[:bytesread],
+ ModifiedTsNs: fileModTsNs,
})
if err != nil {
// println("sending", bytesread, "bytes err", err.Error())
return err
}
+ fileModTsNs = 0 // only send once
bytesToRead -= int64(bytesread)
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index d7e4f302a..79611f499 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -3,9 +3,7 @@ package weed_server
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/volume_info"
"io"
- "io/ioutil"
"math"
"os"
"path"
@@ -13,11 +11,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/storage/volume_info"
"github.com/chrislusf/seaweedfs/weed/util"
)
@@ -49,6 +49,17 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
+ shouldCleanup := true
+ defer func() {
+ if !shouldCleanup {
+ return
+ }
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i))
+ }
+ os.Remove(v.IndexFileName() + ".ecx")
+ }()
+
// write .ec00 ~ .ec13 files
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
@@ -64,6 +75,8 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
+ shouldCleanup = false
+
return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
}
@@ -113,11 +126,11 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
dataBaseFileName := storage.VolumeFileName(location.Directory, req.Collection, int(req.VolumeId))
indexBaseFileName := storage.VolumeFileName(location.IdxDirectory, req.Collection, int(req.VolumeId))
- err := operation.WithVolumeServerClient(req.SourceDataNode, vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
+ err := operation.WithVolumeServerClient(true, pb.ServerAddress(req.SourceDataNode), vs.grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
+ if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false, nil); err != nil {
return err
}
}
@@ -125,7 +138,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcxFile {
// copy ecx file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil {
+ if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false, nil); err != nil {
return err
}
return nil
@@ -133,14 +146,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcjFile {
// copy ecj file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil {
+ if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true, nil); err != nil {
return err
}
}
if req.CopyVifFile {
// copy vif file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil {
+ if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true, nil); err != nil {
return err
}
}
@@ -186,12 +199,12 @@ func (vs *VolumeServer) VolumeEcShardsDelete(ctx context.Context, req *volume_se
existingShardCount := 0
for _, location := range vs.store.Locations {
- fileInfos, err := ioutil.ReadDir(location.Directory)
+ fileInfos, err := os.ReadDir(location.Directory)
if err != nil {
continue
}
if location.IdxDirectory != location.Directory {
- idxFileInfos, err := ioutil.ReadDir(location.IdxDirectory)
+ idxFileInfos, err := os.ReadDir(location.IdxDirectory)
if err != nil {
continue
}
diff --git a/weed/server/volume_grpc_read_all.go b/weed/server/volume_grpc_read_all.go
new file mode 100644
index 000000000..7fe5bad03
--- /dev/null
+++ b/weed/server/volume_grpc_read_all.go
@@ -0,0 +1,36 @@
+package weed_server
+
+import (
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+)
+
+func (vs *VolumeServer) ReadAllNeedles(req *volume_server_pb.ReadAllNeedlesRequest, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer) (err error) {
+
+ for _, vid := range req.VolumeIds {
+ if err := vs.streaReadOneVolume(needle.VolumeId(vid), stream, err); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (vs *VolumeServer) streaReadOneVolume(vid needle.VolumeId, stream volume_server_pb.VolumeServer_ReadAllNeedlesServer, err error) error {
+ v := vs.store.GetVolume(vid)
+ if v == nil {
+ return fmt.Errorf("not found volume id %d", vid)
+ }
+
+ scanner := &storage.VolumeFileScanner4ReadAll{
+ Stream: stream,
+ V: v,
+ }
+
+ offset := int64(v.SuperBlock.BlockSize())
+
+ err = storage.ScanVolumeFileFrom(v.Version(), v.DataBackend, offset, scanner)
+
+ return err
+}
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 0615a96a1..9114db329 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -3,10 +3,14 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "sync"
+ "time"
)
func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) {
@@ -30,16 +34,50 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)
}
- n := new(needle.Needle)
- n.Id = types.NeedleId(req.NeedleId)
- n.Cookie = types.Cookie(req.Cookie)
- n.Data, n.DataSize = data, uint32(len(data))
- // copied from *Needle.prepareWriteBuffer()
- n.Size = 4 + types.Size(n.DataSize) + 1
- n.Checksum = needle.NewCRC(n.Data)
- if _, err = vs.store.WriteVolumeNeedle(v.Id, n, true, false); err != nil {
- return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ n := new(needle.Needle)
+ n.Id = types.NeedleId(req.NeedleId)
+ n.Cookie = types.Cookie(req.Cookie)
+ n.Data, n.DataSize = data, uint32(len(data))
+ // copied from *Needle.prepareWriteBuffer()
+ n.Size = 4 + types.Size(n.DataSize) + 1
+ n.Checksum = needle.NewCRC(n.Data)
+ n.LastModified = uint64(time.Now().Unix())
+ n.SetHasLastModifiedDate()
+ if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil {
+ if err == nil {
+ err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+ }
+ }()
+ if len(req.Replicas) > 0 {
+ fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie)
+ for _, replica := range req.Replicas {
+ wg.Add(1)
+ go func(targetVolumeServer string) {
+ defer wg.Done()
+ uploadOption := &operation.UploadOption{
+ UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()),
+ Filename: "",
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: security.EncodedJwt(req.Auth),
+ }
+ if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil {
+ if err == nil {
+ err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+ }
+ }(replica.Url)
+ }
}
- return resp, nil
+ wg.Wait()
+
+ return resp, err
}
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 3ea902ed3..4022da44a 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/pb"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
@@ -89,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
- return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
+ return resp, operation.TailVolumeFromSource(pb.ServerAddress(req.SourceVolumeServer), vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
_, err := vs.store.WriteVolumeNeedle(v.Id, n, false, false)
return err
})
diff --git a/weed/server/volume_grpc_tier_upload.go b/weed/server/volume_grpc_tier_upload.go
index e51de5f1d..c690de959 100644
--- a/weed/server/volume_grpc_tier_upload.go
+++ b/weed/server/volume_grpc_tier_upload.go
@@ -27,7 +27,7 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi
// locate the disk file
diskFile, ok := v.DataBackend.(*backend.DiskFile)
if !ok {
- return fmt.Errorf("volume %d is not on local disk", req.VolumeId)
+ return nil // already copied to remove. fmt.Errorf("volume %d is not on local disk", req.VolumeId)
}
// check valid storage backend type
@@ -62,13 +62,8 @@ func (vs *VolumeServer) VolumeTierMoveDatToRemote(req *volume_server_pb.VolumeTi
})
}
- // remember the file original source
- attributes := make(map[string]string)
- attributes["volumeId"] = v.Id.String()
- attributes["collection"] = v.Collection
- attributes["ext"] = ".dat"
// copy the data file
- key, size, err := backendStorage.CopyFile(diskFile.File, attributes, fn)
+ key, size, err := backendStorage.CopyFile(diskFile.File, fn)
if err != nil {
return fmt.Errorf("backend %s copy file %s: %v", req.DestinationBackendName, diskFile.Name(), err)
}
diff --git a/weed/server/volume_grpc_vacuum.go b/weed/server/volume_grpc_vacuum.go
index f8d1b7fda..0ab782b02 100644
--- a/weed/server/volume_grpc_vacuum.go
+++ b/weed/server/volume_grpc_vacuum.go
@@ -24,19 +24,35 @@ func (vs *VolumeServer) VacuumVolumeCheck(ctx context.Context, req *volume_serve
}
-func (vs *VolumeServer) VacuumVolumeCompact(ctx context.Context, req *volume_server_pb.VacuumVolumeCompactRequest) (*volume_server_pb.VacuumVolumeCompactResponse, error) {
+func (vs *VolumeServer) VacuumVolumeCompact(req *volume_server_pb.VacuumVolumeCompactRequest, stream volume_server_pb.VolumeServer_VacuumVolumeCompactServer) error {
resp := &volume_server_pb.VacuumVolumeCompactResponse{}
-
- err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond)
+ reportInterval := int64(1024 * 1024 * 128)
+ nextReportTarget := reportInterval
+
+ var sendErr error
+ err := vs.store.CompactVolume(needle.VolumeId(req.VolumeId), req.Preallocate, vs.compactionBytePerSecond, func(processed int64) bool {
+ if processed > nextReportTarget {
+ resp.ProcessedBytes = processed
+ if sendErr = stream.Send(resp); sendErr != nil {
+ return false
+ }
+ nextReportTarget = processed + reportInterval
+ }
+ return true
+ })
if err != nil {
glog.Errorf("compact volume %d: %v", req.VolumeId, err)
- } else {
- glog.V(1).Infof("compact volume %d", req.VolumeId)
+ return err
+ }
+ if sendErr != nil {
+ glog.Errorf("compact volume %d report progress: %v", req.VolumeId, sendErr)
+ return sendErr
}
- return resp, err
+ glog.V(1).Infof("compact volume %d", req.VolumeId)
+ return nil
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 034521b4b..abb30229a 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -1,10 +1,13 @@
package weed_server
import (
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
"sync"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/chrislusf/seaweedfs/weed/storage/types"
"google.golang.org/grpc"
@@ -17,15 +20,17 @@ import (
)
type VolumeServer struct {
+ volume_server_pb.UnimplementedVolumeServerServer
inFlightUploadDataSize int64
inFlightDownloadDataSize int64
concurrentUploadLimit int64
concurrentDownloadLimit int64
inFlightUploadDataLimitCond *sync.Cond
inFlightDownloadDataLimitCond *sync.Cond
+ inflightUploadDataTimeout time.Duration
- SeedMasterNodes []string
- currentMaster string
+ SeedMasterNodes []pb.ServerAddress
+ currentMaster pb.ServerAddress
pulseSeconds int
dataCenter string
rack string
@@ -45,11 +50,11 @@ type VolumeServer struct {
}
func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
- port int, publicUrl string,
+ port int, grpcPort int, publicUrl string,
folders []string, maxCounts []int, minFreeSpaces []util.MinFreeSpace, diskTypes []types.DiskType,
idxFolder string,
needleMapKind storage.NeedleMapKind,
- masterNodes []string, pulseSeconds int,
+ masterNodes []pb.ServerAddress, pulseSeconds int,
dataCenter string, rack string,
whiteList []string,
fixJpgOrientation bool,
@@ -58,6 +63,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
fileSizeLimitMB int,
concurrentUploadLimit int64,
concurrentDownloadLimit int64,
+ inflightUploadDataTimeout time.Duration,
) *VolumeServer {
v := util.GetViper()
@@ -86,16 +92,18 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
inFlightDownloadDataLimitCond: sync.NewCond(new(sync.Mutex)),
concurrentUploadLimit: concurrentUploadLimit,
concurrentDownloadLimit: concurrentDownloadLimit,
+ inflightUploadDataTimeout: inflightUploadDataTimeout,
}
vs.SeedMasterNodes = masterNodes
vs.checkWithMaster()
- vs.store = storage.NewStore(vs.grpcDialOption, port, ip, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes)
+ vs.store = storage.NewStore(vs.grpcDialOption, ip, port, grpcPort, publicUrl, folders, maxCounts, minFreeSpaces, idxFolder, vs.needleMapKind, diskTypes)
vs.guard = security.NewGuard(whiteList, signingKey, expiresAfterSec, readSigningKey, readExpiresAfterSec)
handleStaticResources(adminMux)
adminMux.HandleFunc("/status", vs.statusHandler)
+ adminMux.HandleFunc("/healthz", vs.healthzHandler)
if signingKey == "" || enableUiAccess {
// only expose the volume server details for safe environments
adminMux.HandleFunc("/ui/index.html", vs.uiStatusHandler)
@@ -113,7 +121,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
go vs.heartbeat()
- go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec)
+ go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
return vs
}
diff --git a/weed/server/volume_server_handlers.go b/weed/server/volume_server_handlers.go
index ff2eccc11..293f36f14 100644
--- a/weed/server/volume_server_handlers.go
+++ b/weed/server/volume_server_handlers.go
@@ -1,10 +1,12 @@
package weed_server
import (
+ "fmt"
"net/http"
"strconv"
"strings"
"sync/atomic"
+ "time"
"github.com/chrislusf/seaweedfs/weed/util"
@@ -39,8 +41,14 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
stats.ReadRequest()
vs.inFlightDownloadDataLimitCond.L.Lock()
for vs.concurrentDownloadLimit != 0 && atomic.LoadInt64(&vs.inFlightDownloadDataSize) > vs.concurrentDownloadLimit {
- glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit)
- vs.inFlightDownloadDataLimitCond.Wait()
+ select {
+ case <-r.Context().Done():
+ glog.V(4).Infof("request cancelled from %s: %v", r.RemoteAddr, r.Context().Err())
+ return
+ default:
+ glog.V(4).Infof("wait because inflight download data %d > %d", vs.inFlightDownloadDataSize, vs.concurrentDownloadLimit)
+ vs.inFlightDownloadDataLimitCond.Wait()
+ }
}
vs.inFlightDownloadDataLimitCond.L.Unlock()
vs.GetOrHeadHandler(w, r)
@@ -49,18 +57,31 @@ func (vs *VolumeServer) privateStoreHandler(w http.ResponseWriter, r *http.Reque
vs.guard.WhiteList(vs.DeleteHandler)(w, r)
case "PUT", "POST":
- // wait until in flight data is less than the limit
contentLength := getContentLength(r)
- vs.inFlightUploadDataLimitCond.L.Lock()
- for vs.concurrentUploadLimit != 0 && atomic.LoadInt64(&vs.inFlightUploadDataSize) > vs.concurrentUploadLimit {
- glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
- vs.inFlightUploadDataLimitCond.Wait()
+ // exclude the replication from the concurrentUploadLimitMB
+ if r.URL.Query().Get("type") != "replicate" && vs.concurrentUploadLimit != 0 {
+ startTime := time.Now()
+ vs.inFlightUploadDataLimitCond.L.Lock()
+ for vs.inFlightUploadDataSize > vs.concurrentUploadLimit {
+ //wait timeout check
+ if startTime.Add(vs.inflightUploadDataTimeout).Before(time.Now()) {
+ vs.inFlightUploadDataLimitCond.L.Unlock()
+ err := fmt.Errorf("reject because inflight upload data %d > %d, and wait timeout", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
+ glog.V(1).Infof("too many requests: %v", err)
+ writeJsonError(w, r, http.StatusTooManyRequests, err)
+ return
+ }
+ glog.V(4).Infof("wait because inflight upload data %d > %d", vs.inFlightUploadDataSize, vs.concurrentUploadLimit)
+ vs.inFlightUploadDataLimitCond.Wait()
+ }
+ vs.inFlightUploadDataLimitCond.L.Unlock()
}
- vs.inFlightUploadDataLimitCond.L.Unlock()
atomic.AddInt64(&vs.inFlightUploadDataSize, contentLength)
defer func() {
atomic.AddInt64(&vs.inFlightUploadDataSize, -contentLength)
- vs.inFlightUploadDataLimitCond.Signal()
+ if vs.concurrentUploadLimit != 0 {
+ vs.inFlightUploadDataLimitCond.Signal()
+ }
}()
// processs uploads
@@ -133,7 +154,7 @@ func (vs *VolumeServer) maybeCheckJwtAuthorization(r *http.Request, vid, fid str
return false
}
- token, err := security.DecodeJwt(signingKey, tokenStr)
+ token, err := security.DecodeJwt(signingKey, tokenStr, &security.SeaweedFileIdClaims{})
if err != nil {
glog.V(1).Infof("jwt verification error from %s: %v", r.RemoteAddr, err)
return false
diff --git a/weed/server/volume_server_handlers_admin.go b/weed/server/volume_server_handlers_admin.go
index 7e6c06871..37cf109e2 100644
--- a/weed/server/volume_server_handlers_admin.go
+++ b/weed/server/volume_server_handlers_admin.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/topology"
"net/http"
"path/filepath"
@@ -9,6 +10,24 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
+func (vs *VolumeServer) healthzHandler(w http.ResponseWriter, r *http.Request) {
+ w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
+ volumeInfos := vs.store.VolumeInfos()
+ for _, vinfo := range volumeInfos {
+ if len(vinfo.Collection) == 0 {
+ continue
+ }
+ if vinfo.ReplicaPlacement.GetCopyCount() > 1 {
+ _, err := topology.GetWritableRemoteReplications(vs.store, vs.grpcDialOption, vinfo.Id, vs.GetMaster)
+ if err != nil {
+ w.WriteHeader(http.StatusServiceUnavailable)
+ return
+ }
+ }
+ }
+ w.WriteHeader(http.StatusOK)
+}
+
func (vs *VolumeServer) statusHandler(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Server", "SeaweedFS Volume "+util.VERSION)
m := make(map[string]interface{})
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 5d12108d3..eb5b2be5a 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -6,6 +6,7 @@ import (
"errors"
"fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "github.com/chrislusf/seaweedfs/weed/util/mem"
"io"
"mime"
"net/http"
@@ -29,8 +30,6 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
- glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range"))
-
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
@@ -103,7 +102,9 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
w.WriteHeader(response.StatusCode)
- io.Copy(w, response.Body)
+ buf := mem.Allocate(128 * 1024)
+ defer mem.Free(buf)
+ io.CopyBuffer(w, response.Body, buf)
return
} else {
// redirect
@@ -126,6 +127,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
var count int
var needleSize types.Size
+ readOption.AttemptMetaOnly, readOption.MustMetaOnly = shouldAttemptStreamWrite(hasVolume, ext, r)
onReadSizeFn := func(size types.Size) {
needleSize = size
atomic.AddInt64(&vs.inFlightDownloadDataSize, int64(needleSize))
@@ -140,7 +142,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
vs.inFlightDownloadDataLimitCond.Signal()
}()
- if err != nil && err != storage.ErrorDeleted && r.FormValue("type") != "replicate" && hasVolume {
+ if err != nil && err != storage.ErrorDeleted && hasVolume {
glog.V(4).Infof("read needle: %v", err)
// start to fix it from other replicas, if not deleted and hasVolume and is not a replicated request
}
@@ -217,11 +219,31 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
}
}
- rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r)
+ if !readOption.IsMetaOnly {
+ rs := conditionallyResizeImages(bytes.NewReader(n.Data), ext, r)
+ if e := writeResponseContent(filename, mtype, rs, w, r); e != nil {
+ glog.V(2).Infoln("response write error:", e)
+ }
+ } else {
+ vs.streamWriteResponseContent(filename, mtype, volumeId, n, w, r, readOption)
+ }
+}
- if e := writeResponseContent(filename, mtype, rs, w, r); e != nil {
- glog.V(2).Infoln("response write error:", e)
+func shouldAttemptStreamWrite(hasLocalVolume bool, ext string, r *http.Request) (shouldAttempt bool, mustMetaOnly bool) {
+ if !hasLocalVolume {
+ return false, false
+ }
+ if len(ext) > 0 {
+ ext = strings.ToLower(ext)
}
+ if r.Method == "HEAD" {
+ return true, true
+ }
+ _, _, _, shouldResize := shouldResizeImages(ext, r)
+ if shouldResize {
+ return false, false
+ }
+ return true, false
}
func (vs *VolumeServer) tryHandleChunkedFile(n *needle.Needle, fileName string, ext string, w http.ResponseWriter, r *http.Request) (processed bool) {
@@ -301,7 +323,7 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
}
w.Header().Set("Accept-Ranges", "bytes")
- adjustHeaderContentDisposition(w, r, filename)
+ adjustPassthroughHeaders(w, r, filename)
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
@@ -317,3 +339,27 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
})
return nil
}
+
+func (vs *VolumeServer) streamWriteResponseContent(filename string, mimeType string, volumeId needle.VolumeId, n *needle.Needle, w http.ResponseWriter, r *http.Request, readOption *storage.ReadOption) {
+ totalSize := int64(n.DataSize)
+ if mimeType == "" {
+ if ext := filepath.Ext(filename); ext != "" {
+ mimeType = mime.TypeByExtension(ext)
+ }
+ }
+ if mimeType != "" {
+ w.Header().Set("Content-Type", mimeType)
+ }
+ w.Header().Set("Accept-Ranges", "bytes")
+ adjustPassthroughHeaders(w, r, filename)
+
+ if r.Method == "HEAD" {
+ w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
+ return
+ }
+
+ processRangeRequest(r, w, totalSize, mimeType, func(writer io.Writer, offset int64, size int64) error {
+ return vs.store.ReadVolumeNeedleDataInto(volumeId, n, readOption, writer, offset, size)
+ })
+
+}
diff --git a/weed/server/volume_server_handlers_ui.go b/weed/server/volume_server_handlers_ui.go
index 437e5c45d..2c420c2d6 100644
--- a/weed/server/volume_server_handlers_ui.go
+++ b/weed/server/volume_server_handlers_ui.go
@@ -1,6 +1,7 @@
package weed_server
import (
+ "github.com/chrislusf/seaweedfs/weed/pb"
"net/http"
"path/filepath"
"time"
@@ -35,7 +36,7 @@ func (vs *VolumeServer) uiStatusHandler(w http.ResponseWriter, r *http.Request)
}
args := struct {
Version string
- Masters []string
+ Masters []pb.ServerAddress
Volumes interface{}
EcVolumes interface{}
RemoteVolumes interface{}
diff --git a/weed/server/volume_server_tcp_handlers_write.go b/weed/server/volume_server_tcp_handlers_write.go
index a009611da..24ad916e6 100644
--- a/weed/server/volume_server_tcp_handlers_write.go
+++ b/weed/server/volume_server_tcp_handlers_write.go
@@ -3,12 +3,13 @@ package weed_server
import (
"bufio"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
- "github.com/chrislusf/seaweedfs/weed/storage/needle"
- "github.com/chrislusf/seaweedfs/weed/util"
"io"
"net"
"strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
func (vs *VolumeServer) HandleTcpConnection(c net.Conn) {
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 68c1f3233..265dea03a 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -5,7 +5,6 @@ import (
"context"
"fmt"
"io"
- "math"
"os"
"path"
"strings"
@@ -27,19 +26,18 @@ import (
)
type WebDavOption struct {
- Filer string
- FilerGrpcAddress string
- DomainName string
- BucketsPath string
- GrpcDialOption grpc.DialOption
- Collection string
- Replication string
- DiskType string
- Uid uint32
- Gid uint32
- Cipher bool
- CacheDir string
- CacheSizeMB int64
+ Filer pb.ServerAddress
+ DomainName string
+ BucketsPath string
+ GrpcDialOption grpc.DialOption
+ Collection string
+ Replication string
+ DiskType string
+ Uid uint32
+ Gid uint32
+ Cipher bool
+ CacheDir string
+ CacheSizeMB int64
}
type WebDavServer struct {
@@ -107,7 +105,7 @@ type WebDavFile struct {
func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
- cacheUniqueId := util.Md5String([]byte("webdav" + option.FilerGrpcAddress + util.Version()))[0:8]
+ cacheUniqueId := util.Md5String([]byte("webdav" + string(option.Filer) + util.Version()))[0:8]
cacheDir := path.Join(option.CacheDir, cacheUniqueId)
os.MkdirAll(cacheDir, os.FileMode(0755))
@@ -121,12 +119,12 @@ func NewWebDavFileSystem(option *WebDavOption) (webdav.FileSystem, error) {
var _ = filer_pb.FilerClient(&WebDavFileSystem{})
-func (fs *WebDavFileSystem) WithFilerClient(fn func(filer_pb.SeaweedFilerClient) error) error {
+func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error {
- return pb.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
+ return pb.WithGrpcClient(streamingMode, func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, fs.option.FilerGrpcAddress, fs.option.GrpcDialOption)
+ }, fs.option.Filer.ToGrpcAddress(), fs.option.GrpcDialOption)
}
func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
@@ -163,7 +161,7 @@ func (fs *WebDavFileSystem) Mkdir(ctx context.Context, fullDirPath string, perm
return os.ErrExist
}
- return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
dir, name := util.FullPath(fullDirPath).DirAndName()
request := &filer_pb.CreateEntryRequest{
Directory: dir,
@@ -213,21 +211,19 @@ func (fs *WebDavFileSystem) OpenFile(ctx context.Context, fullFilePath string, f
}
dir, name := util.FullPath(fullFilePath).DirAndName()
- err = fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ err = fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
if err := filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
Directory: dir,
Entry: &filer_pb.Entry{
Name: name,
IsDirectory: perm&os.ModeDir > 0,
Attributes: &filer_pb.FuseAttributes{
- Mtime: time.Now().Unix(),
- Crtime: time.Now().Unix(),
- FileMode: uint32(perm),
- Uid: fs.option.Uid,
- Gid: fs.option.Gid,
- Collection: fs.option.Collection,
- Replication: fs.option.Replication,
- TtlSec: 0,
+ Mtime: time.Now().Unix(),
+ Crtime: time.Now().Unix(),
+ FileMode: uint32(perm),
+ Uid: fs.option.Uid,
+ Gid: fs.option.Gid,
+ TtlSec: 0,
},
},
Signatures: []int32{fs.signature},
@@ -316,7 +312,7 @@ func (fs *WebDavFileSystem) Rename(ctx context.Context, oldName, newName string)
oldDir, oldBaseName := util.FullPath(oldName).DirAndName()
newDir, newBaseName := util.FullPath(newName).DirAndName()
- return fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ return fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
request := &filer_pb.AtomicRenameEntryRequest{
OldDirectory: oldDir,
@@ -376,7 +372,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
var fileId, host string
var auth security.EncodedJwt
- if flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ if flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
ctx := context.Background()
@@ -398,7 +394,7 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
return fmt.Errorf("assign volume failure %v: %v", request, resp.Error)
}
- fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
+ fileId, host, auth = resp.FileId, resp.Location.Url, security.EncodedJwt(resp.Auth)
f.collection, f.replication = resp.Collection, resp.Replication
return nil
@@ -413,7 +409,16 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: fileUrl,
+ Filename: f.name,
+ Cipher: f.fs.option.Cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: auth,
+ }
+ uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
if flushErr != nil {
glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)
@@ -469,10 +474,8 @@ func (f *WebDavFile) Write(buf []byte) (int, error) {
f.entry.Chunks = manifestedChunks
}
- flushErr := f.fs.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ flushErr := f.fs.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
f.entry.Attributes.Mtime = time.Now().Unix()
- f.entry.Attributes.Collection = f.collection
- f.entry.Attributes.Replication = f.replication
request := &filer_pb.UpdateEntryRequest{
Directory: dir,
@@ -532,11 +535,11 @@ func (f *WebDavFile) Read(p []byte) (readSize int, err error) {
return 0, io.EOF
}
if f.entryViewCache == nil {
- f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, math.MaxInt64)
+ f.entryViewCache, _ = filer.NonOverlappingVisibleIntervals(filer.LookupFn(f.fs), f.entry.Chunks, 0, fileSize)
f.reader = nil
}
if f.reader == nil {
- chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, math.MaxInt64)
+ chunkViews := filer.ViewFromVisibleIntervals(f.entryViewCache, 0, fileSize)
f.reader = filer.NewChunkReaderAtFromClient(filer.LookupFn(f.fs), chunkViews, f.fs.chunkCache, fileSize)
}