aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/notification/gocdk_pub_sub/gocdk_pub_sub.go2
-rw-r--r--weed/replication/sub/notification_gocdk_pub_sub.go2
-rw-r--r--weed/server/master_grpc_server.go3
-rw-r--r--weed/server/volume_grpc_batch_delete.go2
-rw-r--r--weed/server/volume_grpc_tail.go2
-rw-r--r--weed/storage/erasure_coding/ec_shard.go2
-rw-r--r--weed/storage/store.go4
-rw-r--r--weed/storage/volume_read_write.go19
-rw-r--r--weed/storage/volume_vacuum.go2
-rw-r--r--weed/topology/store_replicate.go4
10 files changed, 30 insertions, 12 deletions
diff --git a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
index 94a413ac0..ebf44ea6f 100644
--- a/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
+++ b/weed/notification/gocdk_pub_sub/gocdk_pub_sub.go
@@ -24,7 +24,7 @@ import (
"github.com/golang/protobuf/proto"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
- _ "gocloud.dev/pubsub/azuresb"
+ // _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub"
_ "gocloud.dev/pubsub/rabbitpubsub"
diff --git a/weed/replication/sub/notification_gocdk_pub_sub.go b/weed/replication/sub/notification_gocdk_pub_sub.go
index 9c76e6918..eddba9ff8 100644
--- a/weed/replication/sub/notification_gocdk_pub_sub.go
+++ b/weed/replication/sub/notification_gocdk_pub_sub.go
@@ -9,7 +9,7 @@ import (
"github.com/golang/protobuf/proto"
"gocloud.dev/pubsub"
_ "gocloud.dev/pubsub/awssnssqs"
- _ "gocloud.dev/pubsub/azuresb"
+ // _ "gocloud.dev/pubsub/azuresb"
_ "gocloud.dev/pubsub/gcppubsub"
_ "gocloud.dev/pubsub/natspubsub"
_ "gocloud.dev/pubsub/rabbitpubsub"
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index 1a17327a0..e0d1fd174 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -52,8 +52,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
return err
}
+ t.Sequence.SetMax(heartbeat.MaxFileKey)
+
if dn == nil {
- t.Sequence.SetMax(heartbeat.MaxFileKey)
if heartbeat.Ip == "" {
if pr, ok := peer.FromContext(stream.Context()); ok {
if pr.Addr != net.Addr(nil) {
diff --git a/weed/server/volume_grpc_batch_delete.go b/weed/server/volume_grpc_batch_delete.go
index d7fbb6edf..fdb7937d2 100644
--- a/weed/server/volume_grpc_batch_delete.go
+++ b/weed/server/volume_grpc_batch_delete.go
@@ -58,7 +58,7 @@ func (vs *VolumeServer) BatchDelete(ctx context.Context, req *volume_server_pb.B
break
}
n.LastModified = now
- if size, err := vs.store.Delete(volumeId, n); err != nil {
+ if size, err := vs.store.DeleteVolumeNeedle(volumeId, n); err != nil {
resp.Results = append(resp.Results, &volume_server_pb.DeleteResult{
FileId: fid,
Status: http.StatusInternalServerError,
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 698bad5b8..34c55a599 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -110,7 +110,7 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
defer glog.V(1).Infof("receive tailing volume %d finished", v.Id)
return resp, operation.TailVolumeFromSource(req.SourceVolumeServer, vs.grpcDialOption, v.Id, req.SinceNs, int(req.IdleTimeoutSeconds), func(n *needle.Needle) error {
- _, _, err := vs.store.Write(v.Id, n)
+ _, _, err := vs.store.WriteVolumeNeedle(v.Id, n)
return err
})
diff --git a/weed/storage/erasure_coding/ec_shard.go b/weed/storage/erasure_coding/ec_shard.go
index b280157b8..47e6d3d1e 100644
--- a/weed/storage/erasure_coding/ec_shard.go
+++ b/weed/storage/erasure_coding/ec_shard.go
@@ -81,7 +81,7 @@ func (shard *EcVolumeShard) Close() {
func (shard *EcVolumeShard) Destroy() {
os.Remove(shard.FileName() + ToExt(int(shard.ShardId)))
- stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Inc()
+ stats.VolumeServerVolumeCounter.WithLabelValues(shard.Collection, "ec_shards").Dec()
}
func (shard *EcVolumeShard) ReadAt(buf []byte, offset int64) (int, error) {
diff --git a/weed/storage/store.go b/weed/storage/store.go
index d2dd76d52..e99d77774 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -211,7 +211,7 @@ func (s *Store) Close() {
}
}
-func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
+func (s *Store) WriteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (size uint32, isUnchanged bool, err error) {
if v := s.findVolume(i); v != nil {
if v.readOnly {
err = fmt.Errorf("volume %d is read only", i)
@@ -230,7 +230,7 @@ func (s *Store) Write(i needle.VolumeId, n *needle.Needle) (size uint32, isUncha
return
}
-func (s *Store) Delete(i needle.VolumeId, n *needle.Needle) (uint32, error) {
+func (s *Store) DeleteVolumeNeedle(i needle.VolumeId, n *needle.Needle) (uint32, error) {
if v := s.findVolume(i); v != nil && !v.readOnly {
return v.deleteNeedle(n)
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 93ce1eab9..0327e5a1f 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -96,13 +96,29 @@ func (v *Volume) writeNeedle(n *needle.Needle) (offset uint64, size uint32, isUn
n.Ttl = v.Ttl
}
+ // check whether existing needle cookie matches
+ nv, ok := v.nm.Get(n.Id)
+ if ok {
+ existingNeedle, _, _, existingNeedleReadErr := needle.ReadNeedleHeader(v.dataFile, v.Version(), nv.Offset.ToAcutalOffset())
+ if existingNeedleReadErr != nil {
+ err = fmt.Errorf("reading existing needle: %v", existingNeedleReadErr)
+ return
+ }
+ if existingNeedle.Cookie != n.Cookie {
+ glog.V(0).Infof("write cookie mismatch: existing %x, new %x", existingNeedle.Cookie, n.Cookie)
+ err = fmt.Errorf("mismatching cookie %x", n.Cookie)
+ return
+ }
+ }
+
+ // append to dat file
n.AppendAtNs = uint64(time.Now().UnixNano())
if offset, size, _, err = n.Append(v.dataFile, v.Version()); err != nil {
return
}
v.lastAppendAtNs = n.AppendAtNs
- nv, ok := v.nm.Get(n.Id)
+ // add to needle map
if !ok || uint64(nv.Offset.ToAcutalOffset()) < offset {
if err = v.nm.Put(n.Id, ToOffset(int64(offset)), n.Size); err != nil {
glog.V(4).Infof("failed to save in needle map %d: %v", n.Id, err)
@@ -224,6 +240,7 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64,
}
if err != nil {
glog.V(0).Infof("visit needle error: %v", err)
+ return fmt.Errorf("visit needle error: %v", err)
}
offset += NeedleHeaderSize + rest
glog.V(4).Infof("==> new entry offset %d", offset)
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index cc7c7d6e6..3bb306649 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -53,7 +53,7 @@ func (v *Volume) CommitCompact() error {
glog.V(0).Infof("fail to close volume %d", v.Id)
}
v.dataFile = nil
- stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Inc()
+ stats.VolumeServerVolumeCounter.WithLabelValues(v.Collection, "volume").Dec()
var e error
if e = v.makeupDiff(v.FileName()+".cpd", v.FileName()+".cpx", v.FileName()+".dat", v.FileName()+".idx"); e != nil {
diff --git a/weed/topology/store_replicate.go b/weed/topology/store_replicate.go
index d4076d548..d21c4d210 100644
--- a/weed/topology/store_replicate.go
+++ b/weed/topology/store_replicate.go
@@ -25,7 +25,7 @@ func ReplicatedWrite(masterNode string, s *storage.Store,
//check JWT
jwt := security.GetJwt(r)
- size, isUnchanged, err = s.Write(volumeId, n)
+ size, isUnchanged, err = s.WriteVolumeNeedle(volumeId, n)
if err != nil {
err = fmt.Errorf("failed to write to local disk: %v", err)
return
@@ -89,7 +89,7 @@ func ReplicatedDelete(masterNode string, store *storage.Store,
//check JWT
jwt := security.GetJwt(r)
- ret, err := store.Delete(volumeId, n)
+ ret, err := store.DeleteVolumeNeedle(volumeId, n)
if err != nil {
glog.V(0).Infoln("delete error:", err)
return ret, err