diff options
| author | Chris Lu <chris.lu@gmail.com> | 2021-09-06 18:30:44 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2021-09-06 18:30:44 -0700 |
| commit | 2b1feb732c6bc986a5dbfc64cf69b171c13128ca (patch) | |
| tree | 35b723f9bc16b8c824316e13adaaae77fbf86493 /weed/server/volume_grpc_remote.go | |
| parent | 3adc3da29120dd9dc6627b5c7c528345d7c29028 (diff) | |
| download | seaweedfs-2b1feb732c6bc986a5dbfc64cf69b171c13128ca.tar.xz seaweedfs-2b1feb732c6bc986a5dbfc64cf69b171c13128ca.zip | |
remote.cache supports replication
Diffstat (limited to 'weed/server/volume_grpc_remote.go')
| -rw-r--r-- | weed/server/volume_grpc_remote.go | 55 |
1 files changed, 45 insertions, 10 deletions
diff --git a/weed/server/volume_grpc_remote.go b/weed/server/volume_grpc_remote.go index 0615a96a1..de7f2d594 100644 --- a/weed/server/volume_grpc_remote.go +++ b/weed/server/volume_grpc_remote.go @@ -3,10 +3,13 @@ package weed_server import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/volume_server_pb" "github.com/chrislusf/seaweedfs/weed/remote_storage" + "github.com/chrislusf/seaweedfs/weed/security" "github.com/chrislusf/seaweedfs/weed/storage/needle" "github.com/chrislusf/seaweedfs/weed/storage/types" + "sync" ) func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_server_pb.FetchAndWriteNeedleRequest) (resp *volume_server_pb.FetchAndWriteNeedleResponse, err error) { @@ -30,16 +33,48 @@ func (vs *VolumeServer) FetchAndWriteNeedle(ctx context.Context, req *volume_ser return nil, fmt.Errorf("read from remote %+v: %v", remoteStorageLocation, ReadRemoteErr) } - n := new(needle.Needle) - n.Id = types.NeedleId(req.NeedleId) - n.Cookie = types.Cookie(req.Cookie) - n.Data, n.DataSize = data, uint32(len(data)) - // copied from *Needle.prepareWriteBuffer() - n.Size = 4 + types.Size(n.DataSize) + 1 - n.Checksum = needle.NewCRC(n.Data) - if _, err = vs.store.WriteVolumeNeedle(v.Id, n, true, false); err != nil { - return nil, fmt.Errorf("write needle %d size %d: %v", req.NeedleId, req.Size, err) + var wg sync.WaitGroup + wg.Add(1) + go func() { + defer wg.Done() + n := new(needle.Needle) + n.Id = types.NeedleId(req.NeedleId) + n.Cookie = types.Cookie(req.Cookie) + n.Data, n.DataSize = data, uint32(len(data)) + // copied from *Needle.prepareWriteBuffer() + n.Size = 4 + types.Size(n.DataSize) + 1 + n.Checksum = needle.NewCRC(n.Data) + if _, localWriteErr := vs.store.WriteVolumeNeedle(v.Id, n, true, false); localWriteErr != nil { + if err == nil { + err = fmt.Errorf("local write needle %d size %d: %v", req.NeedleId, req.Size, err) + } + } + }() + if len(req.Replicas)>0{ + fileId := needle.NewFileId(v.Id, req.NeedleId, req.Cookie) + for _, replica := range req.Replicas { + wg.Add(1) + go func(targetVolumeServer string) { + defer wg.Done() + uploadOption := &operation.UploadOption{ + UploadUrl: fmt.Sprintf("http://%s/%s?type=replicate", targetVolumeServer, fileId.String()), + Filename: "", + Cipher: false, + IsInputCompressed: false, + MimeType: "", + PairMap: nil, + Jwt: security.EncodedJwt(req.Auth), + } + if _, replicaWriteErr := operation.UploadData(data, uploadOption); replicaWriteErr != nil { + if err == nil { + err = fmt.Errorf("remote write needle %d size %d: %v", req.NeedleId, req.Size, err) + } + } + }(replica.Url) + } } - return resp, nil + wg.Wait() + + return resp, err } |
