aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/server/volume_grpc_tail.go2
-rw-r--r--weed/server/volume_server_handlers_write.go9
-rw-r--r--weed/storage/store.go4
-rw-r--r--weed/storage/volume_read_write.go4
-rw-r--r--weed/storage/volume_vacuum_test.go2
-rw-r--r--weed/topology/store_replicate.go14
6 files changed, 18 insertions, 17 deletions
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 820c28a1d..667131e9f 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -111,7 +111,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
- _, err := vs.store.Write(v.Id, n)
+ _, _, err := vs.store.Write(v.Id, n)
return err
})
diff --git a/weed/server/volume_server_handlers_write.go b/weed/server/volume_server_handlers_write.go
index 45c868c33..188d88ddf 100644
--- a/weed/server/volume_server_handlers_write.go
+++ b/weed/server/volume_server_handlers_write.go
@@ -41,11 +41,14 @@ func (vs *VolumeServer) PostHandler(w http.ResponseWriter, r *http.Request) {
}
ret := operation.UploadResult{}
- _, errorStatus := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
+ _, isUnchanged, writeError := topology.ReplicatedWrite(vs.GetMaster(), vs.store, volumeId, needle, r)
httpStatus := http.StatusCreated
- if errorStatus != "" {
+ if isUnchanged {
+ httpStatus = http.StatusNotModified
+ }
+ if writeError != nil {
httpStatus = http.StatusInternalServerError
- ret.Error = errorStatus
+ ret.Error = writeError.Error()
}
if needle.HasName() {
ret.Name = string(needle.Name)
diff --git a/weed/storage/store.go b/weed/storage/store.go
index b870cee4e..3b2bb2c34 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -192,7 +192,7 @@ func (s *Store) Close() {
}
}
-func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, err error) {
+func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
if v.readOnly {
err = fmt.Errorf("Volume %d is read only", i)
@@ -200,7 +200,7 @@ func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, err err
}
// TODO: count needle size ahead
if MaxPossibleVolumeSize >= v.ContentSize()+uint64(size) {
- _, size, err = v.writeNeedle(n)
+ _, size, isUnchanged, err = v.writeNeedle(n)
} else {
err = fmt.Errorf("Volume Size Limit %d Exceeded! Current size is %d", s.GetVolumeSizeLimit(), v.ContentSize())
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 6899ebbc1..bd268c1a6 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -77,7 +77,7 @@ func (v *Volume) AppendBlob(b []byte) (offset int64, err error) {
return
}
-func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, err error) {
+func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUnchanged bool, err error) {
glog.V(4).Infof("writing needle %s", needle.NewFileIdFromNeedle(v.Id, n).String())
if v.readOnly {
err = fmt.Errorf("%s is read-only", v.dataFile.Name())
@@ -87,7 +87,7 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, err
defer v.dataFileAccessLock.Unlock()
if v.isFileUnchanged(n) {
size = n.DataSize
- glog.V(4).Infof("needle is unchanged!")
+ isUnchanged = true
return
}
diff --git a/weed/storage/volume_vacuum_test.go b/weed/storage/volume_vacuum_test.go
index 5192d23b8..d038eeda3 100644
--- a/weed/storage/volume_vacuum_test.go
+++ b/weed/storage/volume_vacuum_test.go
@@ -124,7 +124,7 @@ func TestCompaction(t *testing.T) {
}
func doSomeWritesDeletes(i int, v *Volume, t *testing.T, infos []*needleInfo) {
n := newRandomNeedle(uint64(i))
- _, size, err := v.writeNeedle(n)
+ _, size, _, err := v.writeNeedle(n)
if err != nil {
t.Fatalf("write file %d: %v", i, err)
}
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index fd19cbfba..d4076d548 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -20,19 +20,18 @@ import (
func ReplicatedWrite(masterNode string, s *storage.Store,
volumeId needle.VolumeId, n *needle.Needle,
- r *http.Request) (size uint32, errorStatus string) {
+ r *http.Request) (size uint32, isUnchanged bool, err error) {
//check JWT
jwt := security.GetJwt(r)
- ret, err := s.Write(volumeId, n)
- needToReplicate := !s.HasVolume(volumeId)
+ size, isUnchanged, err = s.Write(volumeId, n)
if err != nil {
- errorStatus = "Failed to write to local disk (" + err.Error() + ")"
- size = ret
+ err = fmt.Errorf("failed to write to local disk: %v", err)
return
}
+ needToReplicate := !s.HasVolume(volumeId)
needToReplicate = needToReplicate || s.GetVolume(volumeId).NeedToReplicate()
if !needToReplicate {
needToReplicate = s.GetVolume(volumeId).NeedToReplicate()
@@ -75,12 +74,11 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
pairMap, jwt)
return err
}); err != nil {
- ret = 0
- errorStatus = fmt.Sprintf("Failed to write to replicas for volume %d: %v", volumeId, err)
+ size = 0
+ err = fmt.Errorf("failed to write to replicas for volume %d: %v", volumeId, err)
}
}
}
- size = ret
return
}