aboutsummaryrefslogtreecommitdiff
path: root/weed/server/volume_grpc_remote.go
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2021-09-06 18:30:44 -0700
committerChris Lu <chris.lu@gmail.com>2021-09-06 18:30:44 -0700
commit2b1feb732c6bc986a5dbfc64cf69b171c13128ca (patch)
tree35b723f9bc16b8c824316e13adaaae77fbf86493 /weed/server/volume_grpc_remote.go
parent3adc3da29120dd9dc6627b5c7c528345d7c29028 (diff)
downloadseaweedfs-2b1feb732c6bc986a5dbfc64cf69b171c13128ca.tar.xz
seaweedfs-2b1feb732c6bc986a5dbfc64cf69b171c13128ca.zip
remote.cache supports replication
Diffstat (limited to 'weed/server/volume_grpc_remote.go')
-rw-r--r--weed/server/volume_grpc_remote.go55
1 files changed, 45 insertions, 10 deletions
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go
index 0615a96a1..de7f2d594 100644
--- a/weed/server/volume_grpc_remote.go
+++ b/weed/server/volume_grpc_remote.go
@@ -3,10 +3,13 @@ package weed_server
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb"
"github.com/chrislusf/seaweedfs/weed/remote_storage"
+ "github.com/chrislusf/seaweedfs/weed/security"
"github.com/chrislusf/seaweedfs/weed/storage/needle"
"github.com/chrislusf/seaweedfs/weed/storage/types"
+ "sync"
)
func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) {
@@ -30,16 +33,48 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser
return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr)
}
- n := new(needle.Needle)
- n.Id = types.NeedleId(req.NeedleId)
- n.Cookie = types.Cookie(req.Cookie)
- n.Data, n.DataSize = data, uint32(len(data))
- // copied from *Needle.prepareWriteBuffer()
- n.Size = 4 + types.Size(n.DataSize) + 1
- n.Checksum = needle.NewCRC(n.Data)
- if _, err = vs.store.WriteVolumeNeedle(v.Id, n, true, false); err != nil {
- return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ n := new(needle.Needle)
+ n.Id = types.NeedleId(req.NeedleId)
+ n.Cookie = types.Cookie(req.Cookie)
+ n.Data, n.DataSize = data, uint32(len(data))
+ // copied from *Needle.prepareWriteBuffer()
+ n.Size = 4 + types.Size(n.DataSize) + 1
+ n.Checksum = needle.NewCRC(n.Data)
+ if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil {
+ if err == nil {
+ err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+ }
+ }()
+ if len(req.Replicas)>0{
+ fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie)
+ for _, replica := range req.Replicas {
+ wg.Add(1)
+ go func(targetVolumeServer string) {
+ defer wg.Done()
+ uploadOption := &operation.UploadOption{
+ UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()),
+ Filename: "",
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: "",
+ PairMap: nil,
+ Jwt: security.EncodedJwt(req.Auth),
+ }
+ if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil {
+ if err == nil {
+ err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err)
+ }
+ }
+ }(replica.Url)
+ }
}
- return resp, nil
+ wg.Wait()
+
+ return resp, err
}