aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-03-15 03:11:26 -0700
committerChris Lu <chris.lu@gmail.com>2020-03-15 03:11:26 -0700
commit560df51def0713522f311e971b74877c6a9fdcd7 (patch)
treed06743cc7f8d54506e4c3a5705d43ed57520e829
parent7edbee6f57f9e3ba90df46255d0b836fbf0f6976 (diff)
downloadseaweedfs-560df51def0713522f311e971b74877c6a9fdcd7.tar.xz
seaweedfs-560df51def0713522f311e971b74877c6a9fdcd7.zip
refactoring
-rw-r--r--weed/server/volume_grpc_tail.go2
-rw-r--r--weed/server/volume_server_handlers_write.go2
-rw-r--r--weed/storage/store.go7
-rw-r--r--weed/topology/store_replicate.go17
4 files changed, 14 insertions, 14 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index c26d6ed8f..c3b66c5e7 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -90,7 +90,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
- _, _, err := vs.store.WriteVolumeNeedle(v.Id, n)
+ _, err := vs.store.WriteVolumeNeedle(v.Id, n)
return err
})
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 101be4c43..56cebf50f 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -49,7 +49,7 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
ret := operation.UploadResult{}
- _, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
+ isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
// http 204 status code does not allow body
if writeError == nil && isUnchanged {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index e29680f6f..19dbcb70e 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -227,14 +227,15 @@ func (s *Store) Close() {
}
}
-func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
+func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
if v.noWriteOrDelete || v.noWriteCanDelete {
err = fmt.Errorf("volume %d is read only", i)
return
}
- if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(size, v.Version())) {
- _, size, isUnchanged, err = v.writeNeedle(n)
+ // using len(n.Data) here instead of n.Size before n.Size is populated in v.writeNeedle(n)
+ if MaxPossibleVolumeSize >= v.ContentSize()+uint64(needle.GetActualSize(uint32(len(n.Data)), v.Version())) {
+ _, _, isUnchanged, err = v.writeNeedle(n)
} else {
err = fmt.Errorf("volume size limit %d exceeded! current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
}
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index 495c38cfa..6f043a601 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -17,9 +17,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func ReplicatedWrite(masterNode string, s *storage.Store,
- volumeId needle.VolumeId, n *needle.Needle,
- r *http.Request) (size uint32, isUnchanged bool, err error) {
+func ReplicatedWrite(masterNode string, s *storage.Store, volumeId needle.VolumeId, n *needle.Needle, r *http.Request) (isUnchanged bool, err error) {
//check JWT
jwt := security.GetJwt(r)
@@ -33,11 +31,13 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
}
}
- size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
- if err != nil {
- err = fmt.Errorf("failed to write to local disk: %v", err)
- glog.V(0).Infoln(err)
- return
+ if s.GetVolume(volumeId) != nil {
+ isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
+ if err != nil {
+ err = fmt.Errorf("failed to write to local disk: %v", err)
+ glog.V(0).Infoln(err)
+ return
+ }
}
if len(remoteLocations) > 0 { //send to other replica locations
@@ -75,7 +75,6 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
_, err := operation.UploadData(u.String(), string(n.Name), false, n.Data, n.IsGzipped(), string(n.Mime), pairMap, jwt)
return err
}); err != nil {
- size = 0
err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
glog.V(0).Infoln(err)
}