aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
authorhilimd <68371223+hilimd@users.noreply.github.com>2021-09-13 10:34:33 +0800
committerGitHub <noreply@github.com>2021-09-13 10:34:33 +0800
commit1de733fda507e1da94b2e4741c74ba7e5e2c5f76 (patch)
treeaed7ac29e27e0f8def942154603375396fae9489 /weed/server
parent27c05f8c0b5c7bda43babeb61d79684d11851111 (diff)
parent7591336a2269c1ad92266280634bcaea34f7a5d1 (diff)
downloadseaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.tar.xz
seaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.zip
Merge pull request #81 from chrislusf/master
sync
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go28
-rw-r--r--weed/server/filer_grpc_server.go5
-rw-r--r--weed/server/filer_grpc_server_remote.go53
-rw-r--r--weed/server/filer_grpc_server_sub_meta.go25
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/filer_server_handlers_read.go3
-rw-r--r--weed/server/filer_server_handlers_read_dir.go11
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go11
-rw-r--r--weed/server/filer_server_handlers_write_cipher.go11
-rw-r--r--weed/server/filer_server_handlers_write_upload.go11
-rw-r--r--weed/server/filer_server_rocksdb.go1
-rw-r--r--weed/server/filer_ui/filer.html9
-rw-r--r--weed/server/master_grpc_server.go14
-rw-r--r--weed/server/master_grpc_server_volume.go11
-rw-r--r--weed/server/master_server.go4
-rw-r--r--weed/server/master_server_handlers.go3
-rw-r--r--weed/server/volume_grpc_client_to_master.go13
-rw-r--r--weed/server/volume_grpc_copy.go47
-rw-r--r--weed/server/volume_grpc_erasure_coding.go25
-rw-r--r--weed/server/volume_grpc_remote.go75
-rw-r--r--weed/server/volume_server.go3
-rw-r--r--weed/server/webdav_server.go11
22 files changed, 264 insertions, 112 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 2cd2276eb..2054e1a84 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -33,7 +33,24 @@ func init() {
go serverStats.Start()
}
+// bodyAllowedForStatus is a copy of http.bodyAllowedForStatus non-exported function.
+func bodyAllowedForStatus(status int) bool {
+ switch {
+ case status >= 100 && status <= 199:
+ return false
+ case status == http.StatusNoContent:
+ return false
+ case status == http.StatusNotModified:
+ return false
+ }
+ return true
+}
+
func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
+ if !bodyAllowedForStatus(httpStatus) {
+ return
+ }
+
var bytes []byte
if obj != nil {
if r.FormValue("pretty") != "" {
@@ -144,7 +161,16 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope
}
debug("upload file to store", url)
- uploadResult, err := operation.UploadData(url, pu.FileName, false, pu.Data, pu.IsGzipped, pu.MimeType, pu.PairMap, assignResult.Auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: url,
+ Filename: pu.FileName,
+ Cipher: false,
+ IsInputCompressed: pu.IsGzipped,
+ MimeType: pu.MimeType,
+ PairMap: pu.PairMap,
+ Jwt: assignResult.Auth,
+ }
+ uploadResult, err := operation.UploadData(pu.Data, uploadOption)
if err != nil {
writeJsonError(w, r, http.StatusInternalServerError, err)
return
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index 08b01dd09..6a7df0f87 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -384,6 +384,8 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR
func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) {
+ clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId"))
+
t := &filer_pb.GetFilerConfigurationResponse{
Masters: fs.option.Masters,
Collection: fs.option.Collection,
@@ -395,6 +397,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.
MetricsAddress: fs.metricsAddress,
MetricsIntervalSec: int32(fs.metricsIntervalSec),
Version: util.Version(),
+ ClusterId: string(clusterId),
}
glog.V(4).Infof("GetFilerConfiguration: %v", t)
@@ -409,7 +412,7 @@ func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedS
return err
}
- clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort)
+ clientName := util.JoinHostPort(req.Name, int(req.GrpcPort))
m := make(map[string]bool)
for _, tp := range req.Resources {
m[tp] = true
diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go
index 6d77e310a..c47356a8e 100644
--- a/weed/server/filer_grpc_server_remote.go
+++ b/weed/server/filer_grpc_server_remote.go
@@ -6,11 +6,13 @@ import (
"github.com/chrislusf/seaweedfs/weed/filer"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/pb/remote_pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/golang/protobuf/proto"
"strings"
+ "sync"
"time"
)
@@ -27,7 +29,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
}
// find mapping
- var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation
+ var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation
var localMountedDir string
for k, loc := range mappings.Mappings {
if strings.HasPrefix(req.Directory, k) {
@@ -43,7 +45,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
if err != nil {
return nil, err
}
- storageConf := &filer_pb.RemoteConf{}
+ storageConf := &remote_pb.RemoteConf{}
if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil {
return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr)
}
@@ -60,8 +62,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
}
// detect storage option
- // replication level is set to "000" to ensure only need to ask one volume server to fetch the data.
- so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "")
+ so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "")
if err != nil {
return resp, err
}
@@ -79,12 +80,15 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
var chunks []*filer_pb.FileChunk
var fetchAndWriteErr error
+ var wg sync.WaitGroup
limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8)
for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize {
localOffset := offset
+ wg.Add(1)
limitedConcurrentExecutor.Execute(func() {
+ defer wg.Done()
size := chunkSize
if localOffset+chunkSize > entry.Remote.RemoteSize {
size = entry.Remote.RemoteSize - localOffset
@@ -106,22 +110,30 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
return
}
+ var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica
+ for _, r := range assignResult.Replicas {
+ replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{
+ Url: r.Url,
+ PublicUrl: r.PublicUrl,
+ })
+ }
+
// tell filer to tell volume server to download into needles
err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
_, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{
- VolumeId: uint32(fileId.VolumeId),
- NeedleId: uint64(fileId.Key),
- Cookie: uint32(fileId.Cookie),
- Offset: localOffset,
- Size: size,
- RemoteType: storageConf.Type,
- RemoteName: storageConf.Name,
- S3AccessKey: storageConf.S3AccessKey,
- S3SecretKey: storageConf.S3SecretKey,
- S3Region: storageConf.S3Region,
- S3Endpoint: storageConf.S3Endpoint,
- RemoteBucket: remoteStorageMountedLocation.Bucket,
- RemotePath: string(dest),
+ VolumeId: uint32(fileId.VolumeId),
+ NeedleId: uint64(fileId.Key),
+ Cookie: uint32(fileId.Cookie),
+ Offset: localOffset,
+ Size: size,
+ Replicas: replicas,
+ Auth: string(assignResult.Auth),
+ RemoteConf: storageConf,
+ RemoteLocation: &remote_pb.RemoteStorageLocation{
+ Name: remoteStorageMountedLocation.Name,
+ Bucket: remoteStorageMountedLocation.Bucket,
+ Path: string(dest),
+ },
})
if fetchAndWriteErr != nil {
return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr)
@@ -129,7 +141,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
return nil
})
- if err != nil {
+ if err != nil && fetchAndWriteErr == nil {
fetchAndWriteErr = err
return
}
@@ -148,6 +160,11 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo
})
}
+ wg.Wait()
+ if fetchAndWriteErr != nil {
+ return nil, fetchAndWriteErr
+ }
+
garbage := entry.Chunks
newEntry := entry.ShallowClone()
diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go
index 3fdac1b26..a900275b9 100644
--- a/weed/server/filer_grpc_server_sub_meta.go
+++ b/weed/server/filer_grpc_server_sub_meta.go
@@ -201,14 +201,18 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
return nil
}
- if !strings.HasPrefix(fullpath, req.PathPrefix) {
- if eventNotification.NewParentPath != "" {
- newFullPath := util.Join(eventNotification.NewParentPath, entryName)
- if !strings.HasPrefix(newFullPath, req.PathPrefix) {
+ if hasPrefixIn(fullpath, req.PathPrefixes) {
+ // good
+ } else {
+ if !strings.HasPrefix(fullpath, req.PathPrefix) {
+ if eventNotification.NewParentPath != "" {
+ newFullPath := util.Join(eventNotification.NewParentPath, entryName)
+ if !strings.HasPrefix(newFullPath, req.PathPrefix) {
+ return nil
+ }
+ } else {
return nil
}
- } else {
- return nil
}
}
@@ -227,6 +231,15 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe
}
}
+func hasPrefixIn(text string, prefixes []string) bool {
+ for _, p := range prefixes {
+ if strings.HasPrefix(text, p) {
+ return true
+ }
+ }
+ return false
+}
+
func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) {
clientName = clientType + "@" + clientAddress
glog.V(0).Infof("+ listener %v", clientName)
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 534bc4840..7e5e98660 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -143,7 +143,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
readonlyMux.HandleFunc("/", fs.readonlyFilerHandler)
}
- fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers)
+ fs.filer.AggregateFromPeers(util.JoinHostPort(option.Host, int(option.Port)), option.Filers)
fs.filer.LoadBuckets()
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index fc9cacf39..054a1bd00 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -142,6 +142,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
if rangeReq := r.Header.Get("Range"); rangeReq == "" {
ext := filepath.Ext(filename)
+ if len(ext) > 0 {
+ ext = strings.ToLower(ext)
+ }
width, height, mode, shouldResize := shouldResizeImages(ext, r)
if shouldResize {
data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks)
diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go
index 307c411b6..f67e90d38 100644
--- a/weed/server/filer_server_handlers_read_dir.go
+++ b/weed/server/filer_server_handlers_read_dir.go
@@ -2,9 +2,6 @@ package weed_server
import (
"context"
- "encoding/base64"
- "fmt"
- "github.com/skip2/go-qrcode"
"net/http"
"strconv"
"strings"
@@ -72,12 +69,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
return
}
- var qrImageString string
- img, err := qrcode.Encode(fmt.Sprintf("http://%s:%d%s", fs.option.Host, fs.option.Port, r.URL.Path), qrcode.Medium, 128)
- if err == nil {
- qrImageString = base64.StdEncoding.EncodeToString(img)
- }
-
ui.StatusTpl.Execute(w, struct {
Path string
Breadcrumbs []ui.Breadcrumb
@@ -85,7 +76,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
Limit int
LastFileName string
ShouldDisplayLoadMore bool
- QrImage string
}{
path,
ui.ToBreadcrumb(path),
@@ -93,6 +83,5 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque
limit,
lastFileName,
shouldDisplayLoadMore,
- qrImageString,
})
}
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index a42e0fc97..6323d1589 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -241,7 +241,16 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs
}
// upload the chunk to the volume server
- uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: urlLocation,
+ Filename: name,
+ Cipher: fs.option.Cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: auth,
+ }
+ uploadResult, uploadErr, _ := operation.Upload(reader, uploadOption)
if uploadErr != nil {
return nil, "", "", uploadErr
}
diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go
index acaa8f5ab..14fa10e2c 100644
--- a/weed/server/filer_server_handlers_write_cipher.go
+++ b/weed/server/filer_server_handlers_write_cipher.go
@@ -44,7 +44,16 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht
// println("detect2 mimetype to", pu.MimeType)
}
- uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: urlLocation,
+ Filename: pu.FileName,
+ Cipher: true,
+ IsInputCompressed: false,
+ MimeType: pu.MimeType,
+ PairMap: pu.PairMap,
+ Jwt: auth,
+ }
+ uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption)
if uploadError != nil {
return nil, fmt.Errorf("upload to volume server: %v", uploadError)
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 2275ff1bc..196d7638e 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -127,7 +127,16 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil
stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds())
}()
- uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: urlLocation,
+ Filename: fileName,
+ Cipher: fs.option.Cipher,
+ IsInputCompressed: false,
+ MimeType: contentType,
+ PairMap: pairMap,
+ Jwt: auth,
+ }
+ uploadResult, err, data := operation.Upload(limitedReader, uploadOption)
if uploadResult != nil && uploadResult.RetryCount > 0 {
stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount))
}
diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go
index 5fcc7e88f..75965e761 100644
--- a/weed/server/filer_server_rocksdb.go
+++ b/weed/server/filer_server_rocksdb.go
@@ -1,3 +1,4 @@
+//go:build rocksdb
// +build rocksdb
package weed_server
diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html
index 84dc4d4d6..6f57c25d8 100644
--- a/weed/server/filer_ui/filer.html
+++ b/weed/server/filer_ui/filer.html
@@ -36,11 +36,6 @@
display: none;
}
- .qrImage {
- display: block;
- margin-left: auto;
- margin-right: auto;
- }
</style>
</head>
<body>
@@ -115,10 +110,6 @@
<br/>
<br/>
- <div class="navbar navbar-fixed-bottom">
- <img src="data:image/png;base64,{{.QrImage}}" class="qrImage"/>
- </div>
-
</div>
</body>
<script type="text/javascript">
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index afd479b21..94e050259 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -2,8 +2,8 @@ package weed_server
import (
"context"
- "fmt"
"github.com/chrislusf/seaweedfs/weed/storage/backend"
+ "github.com/chrislusf/seaweedfs/weed/util"
"net"
"strings"
"time"
@@ -22,7 +22,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
defer func() {
if dn != nil {
-
+ dn.Counter--
+ if dn.Counter > 0 {
+ glog.V(0).Infof("disconnect phantom volume server %s:%d remaining %d", dn.Ip, dn.Port, dn.Counter)
+ return
+ }
// if the volume server disconnects and reconnects quickly
// the unregister and register can race with each other
ms.Topo.UnRegisterDataNode(dn)
@@ -46,7 +50,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
ms.clientChansLock.RUnlock()
}
-
}
}()
@@ -68,13 +71,14 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
dc := ms.Topo.GetOrCreateDataCenter(dcName)
rack := dc.GetOrCreateRack(rackName)
dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts)
- glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort())
+ glog.V(0).Infof("added volume server %d: %v:%d", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort())
if err := stream.Send(&master_pb.HeartbeatResponse{
VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024,
}); err != nil {
glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err)
return err
}
+ dn.Counter++
}
dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts)
@@ -284,7 +288,7 @@ func findClientAddress(ctx context.Context, grpcPort uint32) string {
}
if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok {
externalIP := tcpAddr.IP
- return fmt.Sprintf("%s:%d", externalIP, grpcPort)
+ return util.JoinHostPort(externalIP.String(), int(grpcPort))
}
return pr.Addr.String()
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 4b975a0c4..3a92889d2 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -147,14 +147,23 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
)
for time.Now().Sub(startTime) < maxTimeout {
- fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option)
+ fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option)
if err == nil {
+ dn := dnList.Head()
+ var replicas []*master_pb.AssignResponse_Replica
+ for _, r := range dnList.Rest() {
+ replicas = append(replicas, &master_pb.AssignResponse_Replica{
+ Url: r.Url(),
+ PublicUrl: r.PublicUrl,
+ })
+ }
return &master_pb.AssignResponse{
Fid: fid,
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
Count: count,
Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
+ Replicas: replicas,
}, nil
}
//glog.V(4).Infoln("waiting for volume growing...")
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index d2edeb6cb..7c78be379 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -224,7 +224,7 @@ func (ms *MasterServer) startAdminScripts() {
scriptLines = append(scriptLines, "unlock")
}
- masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port)
+ masterAddress := util.JoinHostPort(ms.option.Host, ms.option.Port)
var shellOptions shell.ShellOptions
shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master")
@@ -299,7 +299,7 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer
case "snowflake":
var err error
snowflakeId := v.GetInt(SequencerSnowflakeId)
- seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port), snowflakeId)
+ seq, err = sequence.NewSnowflakeSequencer(util.JoinHostPort(option.Host, option.Port), snowflakeId)
if err != nil {
glog.Error(err)
seq = nil
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 2a1f6d523..36c4239fb 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -130,9 +130,10 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request)
return
}
}
- fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option)
+ fid, count, dnList, err := ms.Topo.PickForWrite(requestedCount, option)
if err == nil {
ms.maybeAddJwtAuthorization(w, fid, true)
+ dn := dnList.Head()
writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count})
} else {
writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()})
diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go
index f8875169f..770abdab7 100644
--- a/weed/server/volume_grpc_client_to_master.go
+++ b/weed/server/volume_grpc_client_to_master.go
@@ -24,8 +24,7 @@ func (vs *VolumeServer) GetMaster() string {
}
func (vs *VolumeServer) checkWithMaster() (err error) {
- isConnected := false
- for !isConnected {
+ for {
for _, master := range vs.SeedMasterNodes {
err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{})
@@ -44,7 +43,6 @@ func (vs *VolumeServer) checkWithMaster() (err error) {
}
time.Sleep(1790 * time.Millisecond)
}
- return
}
func (vs *VolumeServer) heartbeat() {
@@ -128,6 +126,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
if vs.store.MaybeAdjustVolumeMax() {
if err = stream.Send(vs.store.CollectHeartbeat()); err != nil {
glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err)
+ return
}
}
}
@@ -161,7 +160,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
&volumeMessage,
},
}
- glog.V(1).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
+ glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err
@@ -172,7 +171,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
&ecShardMessage,
},
}
- glog.V(1).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
+ glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
@@ -184,7 +183,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
&volumeMessage,
},
}
- glog.V(1).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
+ glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id)
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
return "", err
@@ -195,7 +194,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi
&ecShardMessage,
},
}
- glog.V(1).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
+ glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id,
erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds())
if err = stream.Send(deltaBeat); err != nil {
glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err)
diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go
index 2ad77a7ff..53ee3df0a 100644
--- a/weed/server/volume_grpc_copy.go
+++ b/weed/server/volume_grpc_copy.go
@@ -79,17 +79,27 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}()
// println("source:", volFileInfoResp.String())
- if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
+ var modifiedTsNs int64
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil {
return err
}
+ if modifiedTsNs > 0 {
+ os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
+ }
- if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil {
return err
}
+ if modifiedTsNs > 0 {
+ os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
+ }
- if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
+ if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil {
return err
}
+ if modifiedTsNs > 0 {
+ os.Chtimes(dataBaseFileName+".vif", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs))
+ }
os.Remove(dataBaseFileName + ".note")
@@ -129,7 +139,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo
}, err
}
-func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error {
+func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) (modifiedTsNs int64, err error) {
copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{
VolumeId: vid,
@@ -141,15 +151,15 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i
IgnoreSourceFileNotFound: ignoreSourceFileNotFound,
})
if err != nil {
- return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
+ return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err)
}
- err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
+ modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend)
if err != nil {
- return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
+ return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err)
}
- return nil
+ return modifiedTsNs, nil
}
@@ -178,7 +188,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse
return nil
}
-func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error {
+func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) (modifiedTsNs int64, err error) {
glog.V(4).Infof("writing to %s", fileName)
flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC
if isAppend {
@@ -186,7 +196,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
}
dst, err := os.OpenFile(fileName, flags, 0644)
if err != nil {
- return nil
+ return modifiedTsNs, nil
}
defer dst.Close()
@@ -195,13 +205,16 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s
if receiveErr == io.EOF {
break
}
+ if resp.ModifiedTsNs != 0 {
+ modifiedTsNs = resp.ModifiedTsNs
+ }
if receiveErr != nil {
- return fmt.Errorf("receiving %s: %v", fileName, receiveErr)
+ return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr)
}
dst.Write(resp.FileContent)
wt.MaybeSlowdown(int64(len(resp.FileContent)))
}
- return nil
+ return modifiedTsNs, nil
}
func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) {
@@ -271,6 +284,12 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
}
defer file.Close()
+ fileInfo, err := file.Stat()
+ if err != nil {
+ return err
+ }
+ fileModTsNs := fileInfo.ModTime().UnixNano()
+
buffer := make([]byte, BufferSizeLimit)
for bytesToRead > 0 {
@@ -290,12 +309,14 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v
bytesread = int(bytesToRead)
}
err = stream.Send(&volume_server_pb.CopyFileResponse{
- FileContent: buffer[:bytesread],
+ FileContent: buffer[:bytesread],
+ ModifiedTsNs: fileModTsNs,
})
if err != nil {
// println("sending", bytesread, "bytes err", err.Error())
return err
}
+ fileModTsNs = 0 // only send once
bytesToRead -= int64(bytesread)
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 452c2766e..364045d9b 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -3,6 +3,7 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/storage/volume_info"
"io"
"io/ioutil"
"math"
@@ -12,7 +13,6 @@ import (
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
- "github.com/chrislusf/seaweedfs/weed/pb"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/storage"
"github.com/chrislusf/seaweedfs/weed/storage/erasure_coding"
@@ -49,6 +49,17 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection)
}
+ shouldCleanup := true
+ defer func() {
+ if !shouldCleanup {
+ return
+ }
+ for i := 0; i < erasure_coding.TotalShardsCount; i++ {
+ os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i))
+ }
+ os.Remove(v.IndexFileName() + ".ecx")
+ }()
+
// write .ec00 ~ .ec13 files
if err := erasure_coding.WriteEcFiles(baseFileName); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
@@ -60,10 +71,12 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_
}
// write .vif files
- if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
+ if err := volume_info.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil {
return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err)
}
+ shouldCleanup = false
+
return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil
}
@@ -117,7 +130,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
// copy ec data slices
for _, shardId := range req.ShardIds {
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
+ if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil {
return err
}
}
@@ -125,7 +138,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcxFile {
// copy ecx file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil {
+ if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil {
return err
}
return nil
@@ -133,14 +146,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv
if req.CopyEcjFile {
// copy ecj file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil {
+ if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil {
return err
}
}
if req.CopyVifFile {
// copy vif file
- if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil {
+ if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil {
return err
}
}
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 5ca6619bd..aff57e52b 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -3,11 +3,14 @@ package weed_server
import (
"context"
"fmt"
- "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "sync"
+ "time"
)
func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) {
@@ -17,40 +20,64 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
return nil, fmt.Errorf("not found volume id %d", req.VolumeId)
}
- remoteConf := &filer_pb.RemoteConf{
- Type: req.RemoteType,
- Name: req.RemoteName,
- S3AccessKey: req.S3AccessKey,
- S3SecretKey: req.S3SecretKey,
- S3Region: req.S3Region,
- S3Endpoint: req.S3Endpoint,
- }
+ remoteConf := req.RemoteConf
client, getClientErr := remote_storage.GetRemoteStorage(remoteConf)
if getClientErr != nil {
return nil, fmt.Errorf("get remote client: %v", getClientErr)
}
- remoteStorageLocation := &filer_pb.RemoteStorageLocation{
- Name: req.RemoteName,
- Bucket: req.RemoteBucket,
- Path: req.RemotePath,
- }
+ remoteStorageLocation := req.RemoteLocation
+
data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size)
if ReadRemoteErr != nil {
return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)
}
- n := new(needle.Needle)
- n.Id = types.NeedleId(req.NeedleId)
- n.Cookie = types.Cookie(req.Cookie)
- n.Data, n.DataSize = data, uint32(len(data))
- // copied from *Needle.prepareWriteBuffer()
- n.Size = 4 + types.Size(n.DataSize) + 1
- n.Checksum = needle.NewCRC(n.Data)
- if _, err = vs.store.WriteVolumeNeedle(v.Id, n, true, false); err != nil {
- return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ n := new(needle.Needle)
+ n.Id = types.NeedleId(req.NeedleId)
+ n.Cookie = types.Cookie(req.Cookie)
+ n.Data, n.DataSize = data, uint32(len(data))
+ // copied from *Needle.prepareWriteBuffer()
+ n.Size = 4 + types.Size(n.DataSize) + 1
+ n.Checksum = needle.NewCRC(n.Data)
+ n.LastModified = uint64(time.Now().Unix())
+ n.SetHasLastModifiedDate()
+ if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil {
+ if err == nil {
+ err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+ }
+ }()
+ if len(req.Replicas)>0{
+ fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie)
+ for _, replica := range req.Replicas {
+ wg.Add(1)
+ go func(targetVolumeServer string) {
+ defer wg.Done()
+ uploadOption := &operation.UploadOption{
+ UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()),
+ Filename: "",
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: security.EncodedJwt(req.Auth),
+ }
+ if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil {
+ if err == nil {
+ err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+ }
+ }(replica.Url)
+ }
}
- return resp, nil
+ wg.Wait()
+
+ return resp, err
}
diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go
index 034521b4b..9406b5601 100644
--- a/weed/server/volume_server.go
+++ b/weed/server/volume_server.go
@@ -1,7 +1,6 @@
package weed_server
import (
- "fmt"
"github.com/chrislusf/seaweedfs/weed/storage/types"
"net/http"
"sync"
@@ -113,7 +112,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string,
}
go vs.heartbeat()
- go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec)
+ go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec)
return vs
}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 68c1f3233..e99d4a358 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -413,7 +413,16 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64
}
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
- uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth)
+ uploadOption := &operation.UploadOption{
+ UploadUrl: fileUrl,
+ Filename: f.name,
+ Cipher: f.fs.option.Cipher,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: auth,
+ }
+ uploadResult, flushErr, _ := operation.Upload(reader, uploadOption)
if flushErr != nil {
glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr)
return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr)