aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/filer_grpc_server.go7
-rw-r--r--weed/server/filer_server_handlers_write.go12
-rw-r--r--weed/server/filer_server_handlers_write_upload.go1
-rw-r--r--weed/server/master_grpc_server.go6
-rw-r--r--weed/server/master_grpc_server_volume.go24
-rw-r--r--weed/server/master_server_handlers.go8
-rw-r--r--weed/server/webdav_server.go3
7 files changed, 39 insertions, 22 deletions
diff --git a/weed/server/filer_grpc_server.go b/weed/server/filer_grpc_server.go
index aeb288f48..5b07ace07 100644
--- a/weed/server/filer_grpc_server.go
+++ b/weed/server/filer_grpc_server.go
@@ -105,9 +105,10 @@ func (fs *FilerServer) LookupVolume(ctx context.Context, req *filer_pb.LookupVol
}
for _, loc := range locations {
locs = append(locs, &filer_pb.Location{
- Url: loc.Url,
- PublicUrl: loc.PublicUrl,
- GrpcPort: uint32(loc.GrpcPort),
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ GrpcPort: uint32(loc.GrpcPort),
+ DataCenter: loc.DataCenter,
})
}
resp.LocationsMap[vidString] = &filer_pb.Locations{
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 96f0eb81c..fc767dd9e 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -49,7 +49,17 @@ func (fs *FilerServer) assignNewFileInfo(so *operation.StorageOption) (fileId, u
return
}
fileId = assignResult.Fid
- urlLocation = "http://" + assignResult.Url + "/" + assignResult.Fid
+ assignUrl := assignResult.Url
+ // Prefer same data center
+ if fs.option.DataCenter != "" {
+ for _, repl := range assignResult.Replicas {
+ if repl.DataCenter == fs.option.DataCenter {
+ assignUrl = repl.Url
+ break
+ }
+ }
+ }
+ urlLocation = "http://" + assignUrl + "/" + assignResult.Fid
if so.Fsync {
urlLocation += "?fsync=true"
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 718d29313..846454cc2 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -180,7 +180,6 @@ func (fs *FilerServer) dataToChunk(fileName, contentType string, data []byte, ch
time.Sleep(time.Duration(i+1) * 251 * time.Millisecond)
continue
}
-
// upload the chunk to the volume server
uploadResult, uploadErr, _ = fs.doUpload(urlLocation, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 975aab1c1..e917909ae 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -137,14 +137,10 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
stats.MasterReceivedHeartbeatCounter.WithLabelValues("total").Inc()
- var dataCenter string
- if dc := dn.GetDataCenter(); dc != nil {
- dataCenter = string(dc.Id())
- }
message := &master_pb.VolumeLocation{
Url: dn.Url(),
PublicUrl: dn.PublicUrl,
- DataCenter: dataCenter,
+ DataCenter: dn.GetDataCenterId(),
}
if len(heartbeat.NewVolumes) > 0 {
stats.FilerRequestCounter.WithLabelValues("newVolumes").Inc()
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index fb13ebd32..d11a98f93 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -85,8 +85,9 @@ func (ms *MasterServer) LookupVolume(ctx context.Context, req *master_pb.LookupV
var locations []*master_pb.Location
for _, loc := range result.Locations {
locations = append(locations, &master_pb.Location{
- Url: loc.Url,
- PublicUrl: loc.PublicUrl,
+ Url: loc.Url,
+ PublicUrl: loc.PublicUrl,
+ DataCenter: loc.DataCenter,
})
}
var auth string
@@ -165,17 +166,19 @@ func (ms *MasterServer) Assign(ctx context.Context, req *master_pb.AssignRequest
var replicas []*master_pb.Location
for _, r := range dnList.Rest() {
replicas = append(replicas, &master_pb.Location{
- Url: r.Url(),
- PublicUrl: r.PublicUrl,
- GrpcPort: uint32(r.GrpcPort),
+ Url: r.Url(),
+ PublicUrl: r.PublicUrl,
+ GrpcPort: uint32(r.GrpcPort),
+ DataCenter: r.GetDataCenterId(),
})
}
return &master_pb.AssignResponse{
Fid: fid,
Location: &master_pb.Location{
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
- GrpcPort: uint32(dn.GrpcPort),
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ GrpcPort: uint32(dn.GrpcPort),
+ DataCenter: dn.GetDataCenterId(),
},
Count: count,
Auth: string(security.GenJwtForVolumeServer(ms.guard.SigningKey, ms.guard.ExpiresAfterSec, fid)),
@@ -253,8 +256,9 @@ func (ms *MasterServer) LookupEcVolume(ctx context.Context, req *master_pb.Looku
var locations []*master_pb.Location
for _, dn := range shardLocations {
locations = append(locations, &master_pb.Location{
- Url: string(dn.Id()),
- PublicUrl: dn.PublicUrl,
+ Url: string(dn.Id()),
+ PublicUrl: dn.PublicUrl,
+ DataCenter: dn.GetDataCenterId(),
})
}
resp.ShardIdLocations = append(resp.ShardIdLocations, &master_pb.LookupEcVolumeResponse_EcShardIdLocation{
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index 115affe71..2f2fa199d 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -72,13 +72,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
} else {
machines := ms.Topo.Lookup(collection, volumeId)
for _, loc := range machines {
- locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
+ locations = append(locations, operation.Location{
+ Url: loc.Url(), PublicUrl: loc.PublicUrl, DataCenter: loc.GetDataCenterId(),
+ })
}
}
} else {
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
for _, loc := range machines {
- locations = append(locations, operation.Location{Url: loc.Url, PublicUrl: loc.PublicUrl})
+ locations = append(locations, operation.Location{
+ Url: loc.Url, PublicUrl: loc.PublicUrl, DataCenter: loc.DataCenter,
+ })
}
err = getVidLocationsErr
}
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index 9b5039901..475458808 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -137,6 +137,9 @@ func (fs *WebDavFileSystem) WithFilerClient(streamingMode bool, fn func(filer_pb
func (fs *WebDavFileSystem) AdjustedUrl(location *filer_pb.Location) string {
return location.Url
}
+func (fs *WebDavFileSystem) GetDataCenter() string {
+ return fs.filer.MasterClient.DataCenter
+}
func clearName(name string) (string, error) {
slashed := strings.HasSuffix(name, "/")