diff options
Diffstat (limited to 'weed/server')
| -rw-r--r-- | weed/server/filer_grpc_server.go | 7 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write.go | 12 | ||||
| -rw-r--r-- | weed/server/filer_server_handlers_write_upload.go | 1 | ||||
| -rw-r--r-- | weed/server/master_grpc_server.go | 6 | ||||
| -rw-r--r-- | weed/server/master_grpc_server_volume.go | 24 | ||||
| -rw-r--r-- | weed/server/master_server_handlers.go | 8 | ||||
| -rw-r--r-- | weed/server/webdav_server.go | 3 |
7 files changed, 39 insertions, 22 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go index aeb288f48..5b07ace07 100644 --- a/weed/server/filer_grpc_server.go +++ b/weed/server/filer_grpc_server.go @@ -105,9 +105,10 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol } for _, loc := range locations { locs = append(locs, &filer_pb.Location{ - Url: loc.Url, - PublicUrl: loc.PublicUrl, - GrpcPort: uint32(loc.GrpcPort), + Url: loc.Url, + PublicUrl: loc.PublicUrl, + GrpcPort: uint32(loc.GrpcPort), + DataCenter: loc.DataCenter, }) } resp.LocationsMap[vidString] = &filer_pb.Locations{ diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go index 96f0eb81c..fc767dd9e 100644 --- a/weed/server/filer_server_handlers_write.go +++ b/weed/server/filer_server_handlers_write.go @@ -49,7 +49,17 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u return } fileId = assignResult.Fid - urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid + assignUrl := assignResult.Url + // Prefer same data center + if fs.option.DataCenter != "" { + for _, repl := range assignResult.Replicas { + if repl.DataCenter == fs.option.DataCenter { + assignUrl = repl.Url + break + } + } + } + urlLocation = "http://" + assignUrl + "/" + assignResult.Fid if so.Fsync { urlLocation += "?fsync=true" } diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go index 718d29313..846454cc2 100644 --- a/weed/server/filer_server_handlers_write_upload.go +++ b/weed/server/filer_server_handlers_write_upload.go @@ -180,7 +180,6 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch time.Sleep(time.Duration(i+1) * 251 * time.Millisecond) continue } - // upload the chunk to the volume server uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth) if uploadErr != nil { diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go index 975aab1c1..e917909ae 100644 --- a/weed/server/master_grpc_server.go +++ b/weed/server/master_grpc_server.go @@ -137,14 +137,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ glog.V(4).Infof("master received heartbeat %s", heartbeat.String()) stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc() - var dataCenter string - if dc := dn.GetDataCenter(); dc != nil { - dataCenter = string(dc.Id()) - } message := &master_pb.VolumeLocation{ Url: dn.Url(), PublicUrl: dn.PublicUrl, - DataCenter: dataCenter, + DataCenter: dn.GetDataCenterId(), } if len(heartbeat.NewVolumes) > 0 { stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc() diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go index fb13ebd32..d11a98f93 100644 --- a/weed/server/master_grpc_server_volume.go +++ b/weed/server/master_grpc_server_volume.go @@ -85,8 +85,9 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV var locations []*master_pb.Location for _, loc := range result.Locations { locations = append(locations, &master_pb.Location{ - Url: loc.Url, - PublicUrl: loc.PublicUrl, + Url: loc.Url, + PublicUrl: loc.PublicUrl, + DataCenter: loc.DataCenter, }) } var auth string @@ -165,17 +166,19 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest var replicas []*master_pb.Location for _, r := range dnList.Rest() { replicas = append(replicas, &master_pb.Location{ - Url: r.Url(), - PublicUrl: r.PublicUrl, - GrpcPort: uint32(r.GrpcPort), + Url: r.Url(), + PublicUrl: r.PublicUrl, + GrpcPort: uint32(r.GrpcPort), + DataCenter: r.GetDataCenterId(), }) } return &master_pb.AssignResponse{ Fid: fid, Location: &master_pb.Location{ - Url: dn.Url(), - PublicUrl: dn.PublicUrl, - GrpcPort: uint32(dn.GrpcPort), + Url: dn.Url(), + PublicUrl: dn.PublicUrl, + GrpcPort: uint32(dn.GrpcPort), + DataCenter: dn.GetDataCenterId(), }, Count: count, Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)), @@ -253,8 +256,9 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku var locations []*master_pb.Location for _, dn := range shardLocations { locations = append(locations, &master_pb.Location{ - Url: string(dn.Id()), - PublicUrl: dn.PublicUrl, + Url: string(dn.Id()), + PublicUrl: dn.PublicUrl, + DataCenter: dn.GetDataCenterId(), }) } resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{ diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go index 115affe71..2f2fa199d 100644 --- a/weed/server/master_server_handlers.go +++ b/weed/server/master_server_handlers.go @@ -72,13 +72,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo } else { machines := ms.Topo.Lookup(collection, volumeId) for _, loc := range machines { - locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl}) + locations = append(locations, operation.Location{ + Url: loc.Url(), PublicUrl: loc.PublicUrl, DataCenter: loc.GetDataCenterId(), + }) } } } else { machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid) for _, loc := range machines { - locations = append(locations, operation.Location{Url: loc.Url, PublicUrl: loc.PublicUrl}) + locations = append(locations, operation.Location{ + Url: loc.Url, PublicUrl: loc.PublicUrl, DataCenter: loc.DataCenter, + }) } err = getVidLocationsErr } diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go index 9b5039901..475458808 100644 --- a/weed/server/webdav_server.go +++ b/weed/server/webdav_server.go @@ -137,6 +137,9 @@ func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string { return location.Url } +func (fs *WebDavFileSystem) GetDataCenter() string { + return fs.filer.MasterClient.DataCenter +} func clearName(name string) (string, error) { slashed := strings.HasSuffix(name, "/") |
