aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/sequence/memory_sequencer.go6
-rw-r--r--weed/sequence/sequence.go2
-rw-r--r--weed/server/master_server_handlers.go14
-rw-r--r--weed/server/master_server_handlers_admin.go11
-rw-r--r--weed/server/volume_grpc_erasure_coding.go10
-rw-r--r--weed/storage/store.go18
-rw-r--r--weed/storage/store_ec.go4
-rw-r--r--weed/topology/topology.go2
-rw-r--r--weed/topology/topology_map.go2
9 files changed, 48 insertions, 21 deletions
diff --git a/weed/sequence/memory_sequencer.go b/weed/sequence/memory_sequencer.go
index d727dc723..e20c29cc7 100644
--- a/weed/sequence/memory_sequencer.go
+++ b/weed/sequence/memory_sequencer.go
@@ -15,12 +15,12 @@ func NewMemorySequencer() (m *MemorySequencer) {
return
}
-func (m *MemorySequencer) NextFileId(count uint64) (uint64, uint64) {
+func (m *MemorySequencer) NextFileId(count uint64) uint64 {
m.sequenceLock.Lock()
defer m.sequenceLock.Unlock()
ret := m.counter
- m.counter += uint64(count)
- return ret, count
+ m.counter += count
+ return ret
}
func (m *MemorySequencer) SetMax(seenValue uint64) {
diff --git a/weed/sequence/sequence.go b/weed/sequence/sequence.go
index fbdc3b8ef..2258d001b 100644
--- a/weed/sequence/sequence.go
+++ b/weed/sequence/sequence.go
@@ -1,7 +1,7 @@
package sequence
type Sequencer interface {
- NextFileId(count uint64) (uint64, uint64)
+ NextFileId(count uint64) uint64
SetMax(uint64)
Peek() uint64
}
diff --git a/weed/server/master_server_handlers.go b/weed/server/master_server_handlers.go
index c10f9a5b7..9bcd35ced 100644
--- a/weed/server/master_server_handlers.go
+++ b/weed/server/master_server_handlers.go
@@ -65,11 +65,17 @@ func (ms *MasterServer) findVolumeLocation(collection, vid string) operation.Loo
var err error
if ms.Topo.IsLeader() {
volumeId, newVolumeIdErr := needle.NewVolumeId(vid)
- machines := ms.Topo.Lookup(collection, volumeId)
- for _, loc := range machines {
- locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
+ if newVolumeIdErr != nil {
+ err = fmt.Errorf("Unknown volume id %s", vid)
+ } else {
+ machines := ms.Topo.Lookup(collection, volumeId)
+ for _, loc := range machines {
+ locations = append(locations, operation.Location{Url: loc.Url(), PublicUrl: loc.PublicUrl})
+ }
+ if locations == nil {
+ err = fmt.Errorf("volume id %s not found", vid)
+ }
}
- err = newVolumeIdErr
} else {
machines, getVidLocationsErr := ms.MasterClient.GetVidLocations(vid)
for _, loc := range machines {
diff --git a/weed/server/master_server_handlers_admin.go b/weed/server/master_server_handlers_admin.go
index 486bf31f4..a5d976008 100644
--- a/weed/server/master_server_handlers_admin.go
+++ b/weed/server/master_server_handlers_admin.go
@@ -18,9 +18,10 @@ import (
)
func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.Request) {
- collection, ok := ms.Topo.FindCollection(r.FormValue("collection"))
+ collectionName := r.FormValue("collection")
+ collection, ok := ms.Topo.FindCollection(collectionName)
if !ok {
- writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", r.FormValue("collection")))
+ writeJsonError(w, r, http.StatusBadRequest, fmt.Errorf("collection %s does not exist", collectionName))
return
}
for _, server := range collection.ListVolumeServers() {
@@ -35,7 +36,10 @@ func (ms *MasterServer) collectionDeleteHandler(w http.ResponseWriter, r *http.R
return
}
}
- ms.Topo.DeleteCollection(r.FormValue("collection"))
+ ms.Topo.DeleteCollection(collectionName)
+
+ w.WriteHeader(http.StatusNoContent)
+ return
}
func (ms *MasterServer) dirStatusHandler(w http.ResponseWriter, r *http.Request) {
@@ -53,6 +57,7 @@ func (ms *MasterServer) volumeVacuumHandler(w http.ResponseWriter, r *http.Reque
gcThreshold, err = strconv.ParseFloat(gcString, 32)
if err != nil {
glog.V(0).Infof("garbageThreshold %s is not a valid float number: %v", gcString, err)
+ writeJsonError(w, r, http.StatusNotAcceptable, fmt.Errorf("garbageThreshold %s is not a valid float number", gcString))
return
}
}
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index 8140a06f6..242480197 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -252,9 +252,14 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
startOffset, bytesToRead := req.Offset, req.Size
for bytesToRead > 0 {
- bytesread, err := ecShard.ReadAt(buffer, startOffset)
+ // min of bytesToRead and bufSize
+ bufferSize := bufSize
+ if bufferSize > bytesToRead {
+ bufferSize = bytesToRead
+ }
+ bytesread, err := ecShard.ReadAt(buffer[0:bufferSize], startOffset)
- // println(fileName, "read", bytesread, "bytes, with target", bytesToRead)
+ // println("read", ecShard.FileName(), "startOffset", startOffset, bytesread, "bytes, with target", bufferSize)
if bytesread > 0 {
if int64(bytesread) > bytesToRead {
@@ -268,6 +273,7 @@ func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardRea
return err
}
+ startOffset += int64(bytesread)
bytesToRead -= int64(bytesread)
}
diff --git a/weed/storage/store.go b/weed/storage/store.go
index f46b77b7b..4d1061bed 100644
--- a/weed/storage/store.go
+++ b/weed/storage/store.go
@@ -165,8 +165,9 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
var maxFileKey NeedleId
collectionVolumeSize := make(map[string]uint64)
for _, location := range s.Locations {
+ var deleteVids []needle.VolumeId
maxVolumeCount = maxVolumeCount + location.MaxVolumeCount
- location.Lock()
+ location.RLock()
for _, v := range location.volumes {
if maxFileKey < v.MaxFileKey() {
maxFileKey = v.MaxFileKey()
@@ -175,8 +176,7 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
volumeMessages = append(volumeMessages, v.ToVolumeInformationMessage())
} else {
if v.expiredLongEnough(MAX_TTL_VOLUME_REMOVAL_DELAY) {
- location.deleteVolumeById(v.Id)
- glog.V(0).Infoln("volume", v.Id, "is deleted.")
+ deleteVids = append(deleteVids, v.Id)
} else {
glog.V(0).Infoln("volume", v.Id, "is expired.")
}
@@ -184,7 +184,17 @@ func (s *Store) CollectHeartbeat() *master_pb.Heartbeat {
fileSize, _, _ := v.FileStat()
collectionVolumeSize[v.Collection] += fileSize
}
- location.Unlock()
+ location.RUnlock()
+
+ if len(deleteVids) > 0 {
+ // delete expired volumes.
+ location.Lock()
+ for _, vid := range deleteVids {
+ location.deleteVolumeById(vid)
+ glog.V(0).Infoln("volume", vid, "is deleted.")
+ }
+ location.Unlock()
+ }
}
for col, size := range collectionVolumeSize {
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index 8271324cf..7e3f1a46c 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -288,7 +288,7 @@ func (s *Store) readRemoteEcShardInterval(ctx context.Context, sourceDataNodes [
}
for _, sourceDataNode := range sourceDataNodes {
- glog.V(4).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
+ glog.V(3).Infof("read remote ec shard %d.%d from %s", vid, shardId, sourceDataNode)
n, is_deleted, err = s.doReadRemoteEcShardInterval(ctx, sourceDataNode, needleId, vid, shardId, buf, offset)
if err == nil {
return
@@ -340,7 +340,7 @@ func (s *Store) doReadRemoteEcShardInterval(ctx context.Context, sourceDataNode
}
func (s *Store) recoverOneRemoteEcShardInterval(ctx context.Context, needleId types.NeedleId, ecVolume *erasure_coding.EcVolume, shardIdToRecover erasure_coding.ShardId, buf []byte, offset int64) (n int, is_deleted bool, err error) {
- glog.V(4).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
+ glog.V(3).Infof("recover ec shard %d.%d from other locations", ecVolume.VolumeId, shardIdToRecover)
enc, err := reedsolomon.New(erasure_coding.DataShardsCount, erasure_coding.ParityShardsCount)
if err != nil {
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index ea0769248..b7ebe8af5 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -125,7 +125,7 @@ func (t *Topology) PickForWrite(count uint64, option *VolumeGrowOption) (string,
if datanodes.Length() == 0 {
return "", 0, nil, fmt.Errorf("no writable volumes available for collection:%s replication:%s ttl:%s", option.Collection, option.ReplicaPlacement.String(), option.Ttl.String())
}
- fileId, count := t.Sequence.NextFileId(count)
+ fileId := t.Sequence.NextFileId(count)
return needle.NewFileId(*vid, fileId, rand.Uint32()).String(), count, datanodes.Head(), nil
}
diff --git a/weed/topology/topology_map.go b/weed/topology/topology_map.go
index 37a88c9ed..0ad30f12e 100644
--- a/weed/topology/topology_map.go
+++ b/weed/topology/topology_map.go
@@ -23,7 +23,7 @@ func (t *Topology) ToMap() interface{} {
}
}
}
- m["layouts"] = layouts
+ m["Layouts"] = layouts
return m
}