diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2021-09-13 10:34:33 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2021-09-13 10:34:33 +0800 |
| commit | 1de733fda507e1da94b2e4741c74ba7e5e2c5f76 (patch) | |
| tree | aed7ac29e27e0f8def942154603375396fae9489 /weed/server | |
| parent | 27c05f8c0b5c7bda43babeb61d79684d11851111 (diff) | |
| parent | 7591336a2269c1ad92266280634bcaea34f7a5d1 (diff) | |
| download | seaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.tar.xz seaweedfs-1de733fda507e1da94b2e4741c74ba7e5e2c5f76.zip | |
Merge pull request #81 from chrislusf/master
sync
Diffstat (limited to 'weed/server')
22 files changed, 264 insertions, 112 deletions
diff --git a/weed/server/common.go b/weed/server/common.go index 2cd2276eb..2054e1a84 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -33,7 +33,24 @@ func init() { go serverStats.Start() } +// bodyAllowedForStatus is a copy of http.bodyAllowedForStatus non-exported function. +func bodyAllowedForStatus(status int) bool { + switch { + case status >= 100 && status <= 199: + return false + case status == http.StatusNoContent: + return false + case status == http.StatusNotModified: + return false + } + return true +} + func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) { + if !bodyAllowedForStatus(httpStatus) { + return + } + var bytes []byte if obj != nil { if r.FormValue("pretty") != "" { @@ -144,7 +161,16 @@ func submitForClientHandler(w http.ResponseWriter, r *http.Request, masterFn ope } debug("upload file to store", url) - uploadResult, err := operation.UploadData(url, pu.FileName, false, pu.Data, pu.IsGzipped, pu.MimeType, pu.PairMap, assignResult.Auth) + uploadOption := &operation.UploadOption{ + UploadUrl: url, + Filename: pu.FileName, + Cipher: false, + IsInputCompressed: pu.IsGzipped, + MimeType: pu.MimeType, + PairMap: pu.PairMap, + Jwt: assignResult.Auth, + } + uploadResult, err := operation.UploadData(pu.Data, uploadOption) if err != nil { writeJsonError(w, r, http.StatusInternalServerError, err) return diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index 08b01dd09..6a7df0f87 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -384,6 +384,8 @@ func (fs *FilerServer) Statistics(ctx context.Context, req *filer_pb.StatisticsR func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb.GetFilerConfigurationRequest) (resp *filer_pb.GetFilerConfigurationResponse, err error) { + clusterId, _ := fs.filer.Store.KvGet(context.Background(), []byte("clusterId")) + t := &filer_pb.GetFilerConfigurationResponse{ Masters: fs.option.Masters, Collection: fs.option.Collection, @@ -395,6 +397,7 @@ func (fs *FilerServer) GetFilerConfiguration(ctx context.Context, req *filer_pb. MetricsAddress: fs.metricsAddress, MetricsIntervalSec: int32(fs.metricsIntervalSec), Version: util.Version(), + ClusterId: string(clusterId), } glog.V(4).Infof("GetFilerConfiguration: %v", t) @@ -409,7 +412,7 @@ func (fs *FilerServer) KeepConnected(stream filer_pb.SeaweedFiler_KeepConnectedS return err } - clientName := fmt.Sprintf("%s:%d", req.Name, req.GrpcPort) + clientName := util.JoinHostPort(req.Name, int(req.GrpcPort)) m := make(map[string]bool) for _, tp := range req.Resources { m[tp] = true diff --git a/weed/server/filer_grpc_server_remote.go b/weed/server/filer_grpc_server_remote.go index 6d77e310a..c47356a8e 100644 --- a/weed/server/filer_grpc_server_remote.go +++ b/weed/server/filer_grpc_server_remote.go @@ -6,11 +6,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/filer" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/pb/remote_pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/util" "github.com/golang/protobuf/proto" "strings" + "sync" "time" ) @@ -27,7 +29,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo } // find mapping - var remoteStorageMountedLocation *filer_pb.RemoteStorageLocation + var remoteStorageMountedLocation *remote_pb.RemoteStorageLocation var localMountedDir string for k, loc := range mappings.Mappings { if strings.HasPrefix(req.Directory, k) { @@ -43,7 +45,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo if err != nil { return nil, err } - storageConf := &filer_pb.RemoteConf{} + storageConf := &remote_pb.RemoteConf{} if unMarshalErr := proto.Unmarshal(storageConfEntry.Content, storageConf); unMarshalErr != nil { return nil, fmt.Errorf("unmarshal remote storage conf %s/%s: %v", filer.DirectoryEtcRemote, remoteStorageMountedLocation.Name+filer.REMOTE_STORAGE_CONF_SUFFIX, unMarshalErr) } @@ -60,8 +62,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo } // detect storage option - // replication level is set to "000" to ensure only need to ask one volume server to fetch the data. - so, err := fs.detectStorageOption(req.Directory, "", "000", 0, "", "", "") + so, err := fs.detectStorageOption(req.Directory, "", "", 0, "", "", "") if err != nil { return resp, err } @@ -79,12 +80,15 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo var chunks []*filer_pb.FileChunk var fetchAndWriteErr error + var wg sync.WaitGroup limitedConcurrentExecutor := util.NewLimitedConcurrentExecutor(8) for offset := int64(0); offset < entry.Remote.RemoteSize; offset += chunkSize { localOffset := offset + wg.Add(1) limitedConcurrentExecutor.Execute(func() { + defer wg.Done() size := chunkSize if localOffset+chunkSize > entry.Remote.RemoteSize { size = entry.Remote.RemoteSize - localOffset @@ -106,22 +110,30 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo return } + var replicas []*volume_server_pb.FetchAndWriteNeedleRequest_Replica + for _, r := range assignResult.Replicas { + replicas = append(replicas, &volume_server_pb.FetchAndWriteNeedleRequest_Replica{ + Url: r.Url, + PublicUrl: r.PublicUrl, + }) + } + // tell filer to tell volume server to download into needles err = operation.WithVolumeServerClient(assignResult.Url, fs.grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error { _, fetchAndWriteErr := volumeServerClient.FetchAndWriteNeedle(context.Background(), &volume_server_pb.FetchAndWriteNeedleRequest{ - VolumeId: uint32(fileId.VolumeId), - NeedleId: uint64(fileId.Key), - Cookie: uint32(fileId.Cookie), - Offset: localOffset, - Size: size, - RemoteType: storageConf.Type, - RemoteName: storageConf.Name, - S3AccessKey: storageConf.S3AccessKey, - S3SecretKey: storageConf.S3SecretKey, - S3Region: storageConf.S3Region, - S3Endpoint: storageConf.S3Endpoint, - RemoteBucket: remoteStorageMountedLocation.Bucket, - RemotePath: string(dest), + VolumeId: uint32(fileId.VolumeId), + NeedleId: uint64(fileId.Key), + Cookie: uint32(fileId.Cookie), + Offset: localOffset, + Size: size, + Replicas: replicas, + Auth: string(assignResult.Auth), + RemoteConf: storageConf, + RemoteLocation: &remote_pb.RemoteStorageLocation{ + Name: remoteStorageMountedLocation.Name, + Bucket: remoteStorageMountedLocation.Bucket, + Path: string(dest), + }, }) if fetchAndWriteErr != nil { return fmt.Errorf("volume server %s fetchAndWrite %s: %v", assignResult.Url, dest, fetchAndWriteErr) @@ -129,7 +141,7 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo return nil }) - if err != nil { + if err != nil && fetchAndWriteErr == nil { fetchAndWriteErr = err return } @@ -148,6 +160,11 @@ func (fs *FilerServer) DownloadToLocal(ctx context.Context, req *filer_pb.Downlo }) } + wg.Wait() + if fetchAndWriteErr != nil { + return nil, fetchAndWriteErr + } + garbage := entry.Chunks newEntry := entry.ShallowClone() diff --git a/weed/server/filer_grpc_server_sub_meta.go b/weed/server/filer_grpc_server_sub_meta.go index 3fdac1b26..a900275b9 100644 --- a/weed/server/filer_grpc_server_sub_meta.go +++ b/weed/server/filer_grpc_server_sub_meta.go @@ -201,14 +201,18 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe return nil } - if !strings.HasPrefix(fullpath, req.PathPrefix) { - if eventNotification.NewParentPath != "" { - newFullPath := util.Join(eventNotification.NewParentPath, entryName) - if !strings.HasPrefix(newFullPath, req.PathPrefix) { + if hasPrefixIn(fullpath, req.PathPrefixes) { + // good + } else { + if !strings.HasPrefix(fullpath, req.PathPrefix) { + if eventNotification.NewParentPath != "" { + newFullPath := util.Join(eventNotification.NewParentPath, entryName) + if !strings.HasPrefix(newFullPath, req.PathPrefix) { + return nil + } + } else { return nil } - } else { - return nil } } @@ -227,6 +231,15 @@ func (fs *FilerServer) eachEventNotificationFn(req *filer_pb.SubscribeMetadataRe } } +func hasPrefixIn(text string, prefixes []string) bool { + for _, p := range prefixes { + if strings.HasPrefix(text, p) { + return true + } + } + return false +} + func (fs *FilerServer) addClient(clientType string, clientAddress string) (clientName string) { clientName = clientType + "@" + clientAddress glog.V(0).Infof("+ listener %v", clientName) diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go index 534bc4840..7e5e98660 100644 --- a/weed/server/filer_server.go +++ b/weed/server/filer_server.go @@ -143,7 +143,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption) readonlyMux.HandleFunc("/", fs.readonlyFilerHandler) } - fs.filer.AggregateFromPeers(fmt.Sprintf("%s:%d", option.Host, option.Port), option.Filers) + fs.filer.AggregateFromPeers(util.JoinHostPort(option.Host, int(option.Port)), option.Filers) fs.filer.LoadBuckets() diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index fc9cacf39..054a1bd00 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -142,6 +142,9 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) if rangeReq := r.Header.Get("Range"); rangeReq == "" { ext := filepath.Ext(filename) + if len(ext) > 0 { + ext = strings.ToLower(ext) + } width, height, mode, shouldResize := shouldResizeImages(ext, r) if shouldResize { data, err := filer.ReadAll(fs.filer.MasterClient, entry.Chunks) diff --git a/weed/server/filer_server_handlers_read_dir.go b/weed/server/filer_server_handlers_read_dir.go index 307c411b6..f67e90d38 100644 --- a/weed/server/filer_server_handlers_read_dir.go +++ b/weed/server/filer_server_handlers_read_dir.go @@ -2,9 +2,6 @@ package weed_server import ( "context" - "encoding/base64" - "fmt" - "github.com/skip2/go-qrcode" "net/http" "strconv" "strings" @@ -72,12 +69,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque return } - var qrImageString string - img, err := qrcode.Encode(fmt.Sprintf("http://%s:%d%s", fs.option.Host, fs.option.Port, r.URL.Path), qrcode.Medium, 128) - if err == nil { - qrImageString = base64.StdEncoding.EncodeToString(img) - } - ui.StatusTpl.Execute(w, struct { Path string Breadcrumbs []ui.Breadcrumb @@ -85,7 +76,6 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque Limit int LastFileName string ShouldDisplayLoadMore bool - QrImage string }{ path, ui.ToBreadcrumb(path), @@ -93,6 +83,5 @@ func (fs *FilerServer) listDirectoryHandler(w http.ResponseWriter, r *http.Reque limit, lastFileName, shouldDisplayLoadMore, - qrImageString, }) } diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go index a42e0fc97..6323d1589 100644 --- a/weed/server/filer_server_handlers_write_autochunk.go +++ b/weed/server/filer_server_handlers_write_autochunk.go @@ -241,7 +241,16 @@ func (fs *FilerServer) saveAsChunk(so *operation.StorageOption) filer.SaveDataAs } // upload the chunk to the volume server - uploadResult, uploadErr, _ := operation.Upload(urlLocation, name, fs.option.Cipher, reader, false, "", nil, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: urlLocation, + Filename: name, + Cipher: fs.option.Cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: auth, + } + uploadResult, uploadErr, _ := operation.Upload(reader, uploadOption) if uploadErr != nil { return nil, "", "", uploadErr } diff --git a/weed/server/filer_server_handlers_write_cipher.go b/weed/server/filer_server_handlers_write_cipher.go index acaa8f5ab..14fa10e2c 100644 --- a/weed/server/filer_server_handlers_write_cipher.go +++ b/weed/server/filer_server_handlers_write_cipher.go @@ -44,7 +44,16 @@ func (fs *FilerServer) encrypt(ctx context.Context, w http.ResponseWriter, r *ht // println("detect2 mimetype to", pu.MimeType) } - uploadResult, uploadError := operation.UploadData(urlLocation, pu.FileName, true, uncompressedData, false, pu.MimeType, pu.PairMap, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: urlLocation, + Filename: pu.FileName, + Cipher: true, + IsInputCompressed: false, + MimeType: pu.MimeType, + PairMap: pu.PairMap, + Jwt: auth, + } + uploadResult, uploadError := operation.UploadData(uncompressedData, uploadOption) if uploadError != nil { return nil, fmt.Errorf("upload to volume server: %v", uploadError) } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 2275ff1bc..196d7638e 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -127,7 +127,16 @@ func (fs *FilerServer) doUpload(urlLocation string, limitedReader io.Reader, fil stats.FilerRequestHistogram.WithLabelValues("chunkUpload").Observe(time.Since(start).Seconds()) }() - uploadResult, err, data := operation.Upload(urlLocation, fileName, fs.option.Cipher, limitedReader, false, contentType, pairMap, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: urlLocation, + Filename: fileName, + Cipher: fs.option.Cipher, + IsInputCompressed: false, + MimeType: contentType, + PairMap: pairMap, + Jwt: auth, + } + uploadResult, err, data := operation.Upload(limitedReader, uploadOption) if uploadResult != nil && uploadResult.RetryCount > 0 { stats.FilerRequestCounter.WithLabelValues("chunkUploadRetry").Add(float64(uploadResult.RetryCount)) } diff --git a/weed/server/filer_server_rocksdb.go b/weed/server/filer_server_rocksdb.go index 5fcc7e88f..75965e761 100644 --- a/weed/server/filer_server_rocksdb.go +++ b/weed/server/filer_server_rocksdb.go @@ -1,3 +1,4 @@ +//go:build rocksdb // +build rocksdb package weed_server diff --git a/weed/server/filer_ui/filer.html b/weed/server/filer_ui/filer.html index 84dc4d4d6..6f57c25d8 100644 --- a/weed/server/filer_ui/filer.html +++ b/weed/server/filer_ui/filer.html @@ -36,11 +36,6 @@ display: none; } - .qrImage { - display: block; - margin-left: auto; - margin-right: auto; - } </style> </head> <body> @@ -115,10 +110,6 @@ <br/> <br/> - <div class="navbar navbar-fixed-bottom"> - <img src="data:image/png;base64,{{.QrImage}}" class="qrImage"/> - </div> - </div> </body> <script type="text/javascript"> diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index afd479b21..94e050259 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -2,8 +2,8 @@ package weed_server import ( "context" - "fmt" "github.com/chrislusf/seaweedfs/weed/storage/backend" + "github.com/chrislusf/seaweedfs/weed/util" "net" "strings" "time" @@ -22,7 +22,11 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ defer func() { if dn != nil { - + dn.Counter-- + if dn.Counter > 0 { + glog.V(0).Infof("disconnect phantom volume server %s:%d remaining %d", dn.Ip, dn.Port, dn.Counter) + return + } // if the volume server disconnects and reconnects quickly // the unregister and register can race with each other ms.Topo.UnRegisterDataNode(dn) @@ -46,7 +50,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ } ms.clientChansLock.RUnlock() } - } }() @@ -68,13 +71,14 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ dc := ms.Topo.GetOrCreateDataCenter(dcName) rack := dc.GetOrCreateRack(rackName) dn = rack.GetOrCreateDataNode(heartbeat.Ip, int(heartbeat.Port), heartbeat.PublicUrl, heartbeat.MaxVolumeCounts) - glog.V(0).Infof("added volume server %v:%d", heartbeat.GetIp(), heartbeat.GetPort()) + glog.V(0).Infof("added volume server %d: %v:%d", dn.Counter, heartbeat.GetIp(), heartbeat.GetPort()) if err := stream.Send(&master_pb.HeartbeatResponse{ VolumeSizeLimit: uint64(ms.option.VolumeSizeLimitMB) * 1024 * 1024, }); err != nil { glog.Warningf("SendHeartbeat.Send volume size to %s:%d %v", dn.Ip, dn.Port, err) return err } + dn.Counter++ } dn.AdjustMaxVolumeCounts(heartbeat.MaxVolumeCounts) @@ -284,7 +288,7 @@ func findClientAddress(ctx context.Context, grpcPort uint32) string { } if tcpAddr, ok := pr.Addr.(*net.TCPAddr); ok { externalIP := tcpAddr.IP - return fmt.Sprintf("%s:%d", externalIP, grpcPort) + return util.JoinHostPort(externalIP.String(), int(grpcPort)) } return pr.Addr.String() diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index 4b975a0c4..3a92889d2 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -147,14 +147,23 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest ) for time.Now().Sub(startTime) < maxTimeout { - fid, count, dn, err := ms.Topo.PickForWrite(req.Count, option) + fid, count, dnList, err := ms.Topo.PickForWrite(req.Count, option) if err == nil { + dn := dnList.Head() + var replicas []*master_pb.AssignResponse_Replica + for _, r := range dnList.Rest() { + replicas = append(replicas, &master_pb.AssignResponse_Replica{ + Url: r.Url(), + PublicUrl: r.PublicUrl, + }) + } return &master_pb.AssignResponse{ Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count, Auth: string(security.GenJwt(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), + Replicas: replicas, }, nil } //glog.V(4).Infoln("waiting for volume growing...") diff --git a/weed/server/master_server.go b/weed/server/master_server.go index d2edeb6cb..7c78be379 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -224,7 +224,7 @@ func (ms *MasterServer) startAdminScripts() { scriptLines = append(scriptLines, "unlock") } - masterAddress := fmt.Sprintf("%s:%d", ms.option.Host, ms.option.Port) + masterAddress := util.JoinHostPort(ms.option.Host, ms.option.Port) var shellOptions shell.ShellOptions shellOptions.GrpcDialOption = security.LoadClientTLS(v, "grpc.master") @@ -299,7 +299,7 @@ func (ms *MasterServer) createSequencer(option *MasterOption) sequence.Sequencer case "snowflake": var err error snowflakeId := v.GetInt(SequencerSnowflakeId) - seq, err = sequence.NewSnowflakeSequencer(fmt.Sprintf("%s:%d", option.Host, option.Port), snowflakeId) + seq, err = sequence.NewSnowflakeSequencer(util.JoinHostPort(option.Host, option.Port), snowflakeId) if err != nil { glog.Error(err) seq = nil diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 2a1f6d523..36c4239fb 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -130,9 +130,10 @@ func (ms *MasterServer) dirAssignHandler(w http.ResponseWriter, r *http.Request) return } } - fid, count, dn, err := ms.Topo.PickForWrite(requestedCount, option) + fid, count, dnList, err := ms.Topo.PickForWrite(requestedCount, option) if err == nil { ms.maybeAddJwtAuthorization(w, fid, true) + dn := dnList.Head() writeJsonQuiet(w, r, http.StatusOK, operation.AssignResult{Fid: fid, Url: dn.Url(), PublicUrl: dn.PublicUrl, Count: count}) } else { writeJsonQuiet(w, r, http.StatusNotAcceptable, operation.AssignResult{Error: err.Error()}) diff --git a/weed/server/volume_grpc_client_to_master.go b/weed/server/volume_grpc_client_to_master.go index f8875169f..770abdab7 100644 --- a/weed/server/volume_grpc_client_to_master.go +++ b/weed/server/volume_grpc_client_to_master.go @@ -24,8 +24,7 @@ func (vs *VolumeServer) GetMaster() string { } func (vs *VolumeServer) checkWithMaster() (err error) { - isConnected := false - for !isConnected { + for { for _, master := range vs.SeedMasterNodes { err = operation.WithMasterServerClient(master, vs.grpcDialOption, func(masterClient master_pb.SeaweedClient) error { resp, err := masterClient.GetMasterConfiguration(context.Background(), &master_pb.GetMasterConfigurationRequest{}) @@ -44,7 +43,6 @@ func (vs *VolumeServer) checkWithMaster() (err error) { } time.Sleep(1790 * time.Millisecond) } - return } func (vs *VolumeServer) heartbeat() { @@ -128,6 +126,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi if vs.store.MaybeAdjustVolumeMax() { if err = stream.Send(vs.store.CollectHeartbeat()); err != nil { glog.V(0).Infof("Volume Server Failed to talk with master %s: %v", vs.currentMaster, err) + return } } } @@ -161,7 +160,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi &volumeMessage, }, } - glog.V(1).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) + glog.V(0).Infof("volume server %s:%d adds volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) if err = stream.Send(deltaBeat); err != nil { glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) return "", err @@ -172,7 +171,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi &ecShardMessage, }, } - glog.V(1).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, + glog.V(0).Infof("volume server %s:%d adds ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) if err = stream.Send(deltaBeat); err != nil { glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) @@ -184,7 +183,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi &volumeMessage, }, } - glog.V(1).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) + glog.V(0).Infof("volume server %s:%d deletes volume %d", vs.store.Ip, vs.store.Port, volumeMessage.Id) if err = stream.Send(deltaBeat); err != nil { glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) return "", err @@ -195,7 +194,7 @@ func (vs *VolumeServer) doHeartbeat(masterNode, masterGrpcAddress string, grpcDi &ecShardMessage, }, } - glog.V(1).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, + glog.V(0).Infof("volume server %s:%d deletes ec shard %d:%d", vs.store.Ip, vs.store.Port, ecShardMessage.Id, erasure_coding.ShardBits(ecShardMessage.EcIndexBits).ShardIds()) if err = stream.Send(deltaBeat); err != nil { glog.V(0).Infof("Volume Server Failed to update to master %s: %v", masterNode, err) diff --git a/weed/server/volume_grpc_copy.go b/weed/server/volume_grpc_copy.go index 2ad77a7ff..53ee3df0a 100644 --- a/weed/server/volume_grpc_copy.go +++ b/weed/server/volume_grpc_copy.go @@ -79,17 +79,27 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }() // println("source:", volFileInfoResp.String()) - if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { + var modifiedTsNs int64 + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".dat", false, true); err != nil { return err } + if modifiedTsNs > 0 { + os.Chtimes(dataBaseFileName+".dat", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) + } - if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.IdxFileSize, indexBaseFileName, ".idx", false, false); err != nil { return err } + if modifiedTsNs > 0 { + os.Chtimes(indexBaseFileName+".idx", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) + } - if err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { + if modifiedTsNs, err = vs.doCopyFile(client, false, req.Collection, req.VolumeId, volFileInfoResp.CompactionRevision, volFileInfoResp.DatFileSize, dataBaseFileName, ".vif", false, true); err != nil { return err } + if modifiedTsNs > 0 { + os.Chtimes(dataBaseFileName+".vif", time.Unix(0, modifiedTsNs), time.Unix(0, modifiedTsNs)) + } os.Remove(dataBaseFileName + ".note") @@ -129,7 +139,7 @@ func (vs *VolumeServer) VolumeCopy(ctx context.Context, req *volume_server_pb.Vo }, err } -func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) error { +func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, isEcVolume bool, collection string, vid, compactRevision uint32, stopOffset uint64, baseFileName, ext string, isAppend, ignoreSourceFileNotFound bool) (modifiedTsNs int64, err error) { copyFileClient, err := client.CopyFile(context.Background(), &volume_server_pb.CopyFileRequest{ VolumeId: vid, @@ -141,15 +151,15 @@ func (vs *VolumeServer) doCopyFile(client volume_server_pb.VolumeServerClient, i IgnoreSourceFileNotFound: ignoreSourceFileNotFound, }) if err != nil { - return fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) + return modifiedTsNs, fmt.Errorf("failed to start copying volume %d %s file: %v", vid, ext, err) } - err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) + modifiedTsNs, err = writeToFile(copyFileClient, baseFileName+ext, util.NewWriteThrottler(vs.compactionBytePerSecond), isAppend) if err != nil { - return fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) + return modifiedTsNs, fmt.Errorf("failed to copy %s file: %v", baseFileName+ext, err) } - return nil + return modifiedTsNs, nil } @@ -178,7 +188,7 @@ func checkCopyFiles(originFileInf *volume_server_pb.ReadVolumeFileStatusResponse return nil } -func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) error { +func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName string, wt *util.WriteThrottler, isAppend bool) (modifiedTsNs int64, err error) { glog.V(4).Infof("writing to %s", fileName) flags := os.O_WRONLY | os.O_CREATE | os.O_TRUNC if isAppend { @@ -186,7 +196,7 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s } dst, err := os.OpenFile(fileName, flags, 0644) if err != nil { - return nil + return modifiedTsNs, nil } defer dst.Close() @@ -195,13 +205,16 @@ func writeToFile(client volume_server_pb.VolumeServer_CopyFileClient, fileName s if receiveErr == io.EOF { break } + if resp.ModifiedTsNs != 0 { + modifiedTsNs = resp.ModifiedTsNs + } if receiveErr != nil { - return fmt.Errorf("receiving %s: %v", fileName, receiveErr) + return modifiedTsNs, fmt.Errorf("receiving %s: %v", fileName, receiveErr) } dst.Write(resp.FileContent) wt.MaybeSlowdown(int64(len(resp.FileContent))) } - return nil + return modifiedTsNs, nil } func (vs *VolumeServer) ReadVolumeFileStatus(ctx context.Context, req *volume_server_pb.ReadVolumeFileStatusRequest) (*volume_server_pb.ReadVolumeFileStatusResponse, error) { @@ -271,6 +284,12 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v } defer file.Close() + fileInfo, err := file.Stat() + if err != nil { + return err + } + fileModTsNs := fileInfo.ModTime().UnixNano() + buffer := make([]byte, BufferSizeLimit) for bytesToRead > 0 { @@ -290,12 +309,14 @@ func (vs *VolumeServer) CopyFile(req *volume_server_pb.CopyFileRequest, stream v bytesread = int(bytesToRead) } err = stream.Send(&volume_server_pb.CopyFileResponse{ - FileContent: buffer[:bytesread], + FileContent: buffer[:bytesread], + ModifiedTsNs: fileModTsNs, }) if err != nil { // println("sending", bytesread, "bytes err", err.Error()) return err } + fileModTsNs = 0 // only send once bytesToRead -= int64(bytesread) diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go index 452c2766e..364045d9b 100644 --- a/weed/server/volume_grpc_erasure_coding.go +++ b/weed/server/volume_grpc_erasure_coding.go @@ -3,6 +3,7 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/storage/volume_info" "io" "io/ioutil" "math" @@ -12,7 +13,6 @@ import ( "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" - "github.com/chrislusf/seaweedfs/weed/pb" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/storage" "github.com/chrislusf/seaweedfs/weed/storage/erasure_coding" @@ -49,6 +49,17 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ return nil, fmt.Errorf("existing collection:%v unexpected input: %v", v.Collection, req.Collection) } + shouldCleanup := true + defer func() { + if !shouldCleanup { + return + } + for i := 0; i < erasure_coding.TotalShardsCount; i++ { + os.Remove(fmt.Sprintf("%s.ec%2d", baseFileName, i)) + } + os.Remove(v.IndexFileName() + ".ecx") + }() + // write .ec00 ~ .ec13 files if err := erasure_coding.WriteEcFiles(baseFileName); err != nil { return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) @@ -60,10 +71,12 @@ func (vs *VolumeServer) VolumeEcShardsGenerate(ctx context.Context, req *volume_ } // write .vif files - if err := pb.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil { + if err := volume_info.SaveVolumeInfo(baseFileName+".vif", &volume_server_pb.VolumeInfo{Version: uint32(v.Version())}); err != nil { return nil, fmt.Errorf("WriteEcFiles %s: %v", baseFileName, err) } + shouldCleanup = false + return &volume_server_pb.VolumeEcShardsGenerateResponse{}, nil } @@ -117,7 +130,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv // copy ec data slices for _, shardId := range req.ShardIds { - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { + if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, erasure_coding.ToExt(int(shardId)), false, false); err != nil { return err } } @@ -125,7 +138,7 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcxFile { // copy ecx file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil { + if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecx", false, false); err != nil { return err } return nil @@ -133,14 +146,14 @@ func (vs *VolumeServer) VolumeEcShardsCopy(ctx context.Context, req *volume_serv if req.CopyEcjFile { // copy ecj file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil { + if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, indexBaseFileName, ".ecj", true, true); err != nil { return err } } if req.CopyVifFile { // copy vif file - if err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil { + if _, err := vs.doCopyFile(client, true, req.Collection, req.VolumeId, math.MaxUint32, math.MaxInt64, dataBaseFileName, ".vif", false, true); err != nil { return err } } diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 5ca6619bd..aff57e52b 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -3,11 +3,14 @@ package weed_server import ( "context" "fmt" - "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" + "sync" + "time" ) func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) { @@ -17,40 +20,64 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser return nil, fmt.Errorf("not found volume id %d", req.VolumeId) } - remoteConf := &filer_pb.RemoteConf{ - Type: req.RemoteType, - Name: req.RemoteName, - S3AccessKey: req.S3AccessKey, - S3SecretKey: req.S3SecretKey, - S3Region: req.S3Region, - S3Endpoint: req.S3Endpoint, - } + remoteConf := req.RemoteConf client, getClientErr := remote_storage.GetRemoteStorage(remoteConf) if getClientErr != nil { return nil, fmt.Errorf("get remote client: %v", getClientErr) } - remoteStorageLocation := &filer_pb.RemoteStorageLocation{ - Name: req.RemoteName, - Bucket: req.RemoteBucket, - Path: req.RemotePath, - } + remoteStorageLocation := req.RemoteLocation + data, ReadRemoteErr := client.ReadFile(remoteStorageLocation, req.Offset, req.Size) if ReadRemoteErr != nil { return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr) } - n := new(needle.Needle) - n.Id = types.NeedleId(req.NeedleId) - n.Cookie = types.Cookie(req.Cookie) - n.Data, n.DataSize = data, uint32(len(data)) - // copied from *Needle.prepareWriteBuffer() - n.Size = 4 + types.Size(n.DataSize) + 1 - n.Checksum = needle.NewCRC(n.Data) - if _, err = vs.store.WriteVolumeNeedle(v.Id, n, true, false); err != nil { - return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + n := new(needle.Needle) + n.Id = types.NeedleId(req.NeedleId) + n.Cookie = types.Cookie(req.Cookie) + n.Data, n.DataSize = data, uint32(len(data)) + // copied from *Needle.prepareWriteBuffer() + n.Size = 4 + types.Size(n.DataSize) + 1 + n.Checksum = needle.NewCRC(n.Data) + n.LastModified = uint64(time.Now().Unix()) + n.SetHasLastModifiedDate() + if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil { + if err == nil { + err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, err) + } + } + }() + if len(req.Replicas)>0{ + fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie) + for _, replica := range req.Replicas { + wg.Add(1) + go func(targetVolumeServer string) { + defer wg.Done() + uploadOption := &operation.UploadOption{ + UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()), + Filename: "", + Cipher: false, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: security.EncodedJwt(req.Auth), + } + if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil { + if err == nil { + err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err) + } + } + }(replica.Url) + } } - return resp, nil + wg.Wait() + + return resp, err } diff --git a/weed/server/volume_server.go b/weed/server/volume_server.go index 034521b4b..9406b5601 100644 --- a/weed/server/volume_server.go +++ b/weed/server/volume_server.go @@ -1,7 +1,6 @@ package weed_server import ( - "fmt" "github.com/chrislusf/seaweedfs/weed/storage/types" "net/http" "sync" @@ -113,7 +112,7 @@ func NewVolumeServer(adminMux, publicMux *http.ServeMux, ip string, } go vs.heartbeat() - go stats.LoopPushingMetric("volumeServer", fmt.Sprintf("%s:%d", ip, port), vs.metricsAddress, vs.metricsIntervalSec) + go stats.LoopPushingMetric("volumeServer", util.JoinHostPort(ip, port), vs.metricsAddress, vs.metricsIntervalSec) return vs } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 68c1f3233..e99d4a358 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -413,7 +413,16 @@ func (f *WebDavFile) saveDataAsChunk(reader io.Reader, name string, offset int64 } fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) - uploadResult, flushErr, _ := operation.Upload(fileUrl, f.name, f.fs.option.Cipher, reader, false, "", nil, auth) + uploadOption := &operation.UploadOption{ + UploadUrl: fileUrl, + Filename: f.name, + Cipher: f.fs.option.Cipher, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: auth, + } + uploadResult, flushErr, _ := operation.Upload(reader, uploadOption) if flushErr != nil { glog.V(0).Infof("upload data %v to %s: %v", f.name, fileUrl, flushErr) return nil, f.collection, f.replication, fmt.Errorf("upload data: %v", flushErr) |
