aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/server/volume_grpc_erasure_coding.go4
-rw-r--r--weed/server/volume_server_handlers_read.go6
-rw-r--r--weed/storage/disk_location.go14
-rw-r--r--weed/storage/disk_location_ec.go47
-rw-r--r--weed/storage/disk_location_ec_test.go2
-rw-r--r--weed/storage/erasure_coding/ec_volume.go40
-rw-r--r--weed/storage/store_ec.go24
7 files changed, 64 insertions, 73 deletions
diff --git a/weed/server/volume_grpc_erasure_coding.go b/weed/server/volume_grpc_erasure_coding.go
index aa0f80442..6cb826d30 100644
--- a/weed/server/volume_grpc_erasure_coding.go
+++ b/weed/server/volume_grpc_erasure_coding.go
@@ -151,11 +151,11 @@ func (vs *VolumeServer) VolumeEcShardsUnmount(ctx context.Context, req *volume_s
func (vs *VolumeServer) VolumeEcShardRead(req *volume_server_pb.VolumeEcShardReadRequest, stream volume_server_pb.VolumeServer_VolumeEcShardReadServer) error {
- ecShards, found := vs.store.HasEcShard(needle.VolumeId(req.VolumeId))
+ ecVolume, found := vs.store.FindEcVolume(needle.VolumeId(req.VolumeId))
if !found {
return fmt.Errorf("not found ec volume id %d", req.VolumeId)
}
- ecShard, found := ecShards.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
+ ecShard, found := ecVolume.FindEcVolumeShard(erasure_coding.ShardId(req.ShardId))
if !found {
return fmt.Errorf("not found ec shard %d.%d", req.VolumeId, req.ShardId)
}
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 2e32bae5f..839e17fbe 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -42,8 +42,8 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
glog.V(4).Infoln("volume", volumeId, "reading", n)
hasVolume := vs.store.HasVolume(volumeId)
- _, hasEcShard := vs.store.HasEcShard(volumeId)
- if !hasVolume && !hasEcShard {
+ _, hasEcVolume := vs.store.FindEcVolume(volumeId)
+ if !hasVolume && !hasEcVolume {
if !vs.ReadRedirect {
glog.V(2).Infoln("volume is not local:", err, r.URL.Path)
w.WriteHeader(http.StatusNotFound)
@@ -71,7 +71,7 @@ func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
var count int
if hasVolume {
count, err = vs.store.ReadVolumeNeedle(volumeId, n)
- } else if hasEcShard {
+ } else if hasEcVolume {
count, err = vs.store.ReadEcShardNeedle(context.Background(), volumeId, n)
}
glog.V(4).Infoln("read bytes", count, "error", err)
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index bd6d467d9..0aedb0f47 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -20,14 +20,14 @@ type DiskLocation struct {
sync.RWMutex
// erasure coding
- ecShards map[needle.VolumeId]erasure_coding.EcVolumeShards
- ecShardsLock sync.RWMutex
+ ecVolumes map[needle.VolumeId]*erasure_coding.EcVolume
+ ecVolumesLock sync.RWMutex
}
func NewDiskLocation(dir string, maxVolumeCount int) *DiskLocation {
location := &DiskLocation{Directory: dir, MaxVolumeCount: maxVolumeCount}
location.volumes = make(map[needle.VolumeId]*Volume)
- location.ecShards = make(map[needle.VolumeId]erasure_coding.EcVolumeShards)
+ location.ecVolumes = make(map[needle.VolumeId]*erasure_coding.EcVolume)
return location
}
@@ -109,7 +109,7 @@ func (l *DiskLocation) loadExistingVolumes(needleMapKind NeedleMapType) {
glog.V(0).Infof("Store started on dir: %s with %d volumes max %d", l.Directory, len(l.volumes), l.MaxVolumeCount)
l.loadAllEcShards()
- glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecShards))
+ glog.V(0).Infof("Store started on dir: %s with %d ec shards", l.Directory, len(l.ecVolumes))
}
@@ -208,11 +208,11 @@ func (l *DiskLocation) Close() {
}
l.Unlock()
- l.ecShardsLock.Lock()
- for _, shards := range l.ecShards {
+ l.ecVolumesLock.Lock()
+ for _, shards := range l.ecVolumes {
shards.Close()
}
- l.ecShardsLock.Unlock()
+ l.ecVolumesLock.Unlock()
return
}
diff --git a/weed/storage/disk_location_ec.go b/weed/storage/disk_location_ec.go
index e91c0f262..2aec1502b 100644
--- a/weed/storage/disk_location_ec.go
+++ b/weed/storage/disk_location_ec.go
@@ -16,26 +16,26 @@ var (
re = regexp.MustCompile("\\.ec[0-9][0-9]")
)
-func (l *DiskLocation) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, bool) {
- l.ecShardsLock.RLock()
- defer l.ecShardsLock.RUnlock()
+func (l *DiskLocation) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
+ l.ecVolumesLock.RLock()
+ defer l.ecVolumesLock.RUnlock()
- ecShards, ok := l.ecShards[vid]
+ ecVolume, ok := l.ecVolumes[vid]
if ok {
- return ecShards, true
+ return ecVolume, true
}
return nil, false
}
func (l *DiskLocation) FindEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) (*erasure_coding.EcVolumeShard, bool) {
- l.ecShardsLock.RLock()
- defer l.ecShardsLock.RUnlock()
+ l.ecVolumesLock.RLock()
+ defer l.ecVolumesLock.RUnlock()
- ecShards, ok := l.ecShards[vid]
+ ecVolume, ok := l.ecVolumes[vid]
if !ok {
return nil, false
}
- for _, ecShard := range ecShards {
+ for _, ecShard := range ecVolume.Shards {
if ecShard.ShardId == shardId {
return ecShard, true
}
@@ -49,40 +49,29 @@ func (l *DiskLocation) LoadEcShard(collection string, vid needle.VolumeId, shard
if err != nil {
return fmt.Errorf("failed to create ec shard %d.%d: %v", vid, shardId, err)
}
- l.ecShardsLock.Lock()
- l.ecShards[vid] = append(l.ecShards[vid], ecVolumeShard)
- l.ecShardsLock.Unlock()
+ l.ecVolumesLock.Lock()
+ l.ecVolumes[vid].AddEcVolumeShard(ecVolumeShard)
+ l.ecVolumesLock.Unlock()
return nil
}
func (l *DiskLocation) UnloadEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId) bool {
- l.ecShardsLock.Lock()
- defer l.ecShardsLock.Unlock()
+ l.ecVolumesLock.Lock()
+ defer l.ecVolumesLock.Unlock()
- vidShards, found := l.ecShards[vid]
+ ecVolume, found := l.ecVolumes[vid]
if !found {
return false
}
- shardIndex := -1
- for i, shard := range vidShards {
- if shard.ShardId == shardId {
- shardIndex = i
- break
+ if deleted := ecVolume.DeleteEcVolumeShard(shardId); deleted {
+ if len(ecVolume.Shards) == 0 {
+ delete(l.ecVolumes, vid)
}
- }
- if shardIndex < 0 {
- return false
- }
-
- if len(vidShards) == 1 {
- delete(l.ecShards, vid)
return true
}
- l.ecShards[vid] = append(vidShards[:shardIndex], vidShards[shardIndex+1:]...)
-
return true
}
diff --git a/weed/storage/disk_location_ec_test.go b/weed/storage/disk_location_ec_test.go
index 99cb242a4..a65eb906b 100644
--- a/weed/storage/disk_location_ec_test.go
+++ b/weed/storage/disk_location_ec_test.go
@@ -11,7 +11,7 @@ func TestLoadingEcShards(t *testing.T) {
t.Errorf("load all ec shards: %v", err)
}
- if len(dl.ecShards)!=1 {
+ if len(dl.ecVolumes)!=1 {
t.Errorf("loading err")
}
} \ No newline at end of file
diff --git a/weed/storage/erasure_coding/ec_volume.go b/weed/storage/erasure_coding/ec_volume.go
index 881e88d5f..5fba31f95 100644
--- a/weed/storage/erasure_coding/ec_volume.go
+++ b/weed/storage/erasure_coding/ec_volume.go
@@ -9,26 +9,28 @@ import (
"github.com/chrislusf/seaweedfs/weed/storage/types"
)
-type EcVolumeShards []*EcVolumeShard
+type EcVolume struct {
+ Shards []*EcVolumeShard
+}
-func (shards *EcVolumeShards) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
- for _, s := range *shards {
+func (ev *EcVolume) AddEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
+ for _, s := range ev.Shards {
if s.ShardId == ecVolumeShard.ShardId {
return false
}
}
- *shards = append(*shards, ecVolumeShard)
- sort.Slice(shards, func(i, j int) bool {
- return (*shards)[i].VolumeId < (*shards)[j].VolumeId ||
- (*shards)[i].VolumeId == (*shards)[j].VolumeId && (*shards)[i].ShardId < (*shards)[j].ShardId
+ ev.Shards = append(ev.Shards, ecVolumeShard)
+ sort.Slice(ev, func(i, j int) bool {
+ return ev.Shards[i].VolumeId < ev.Shards[j].VolumeId ||
+ ev.Shards[i].VolumeId == ev.Shards[j].VolumeId && ev.Shards[i].ShardId < ev.Shards[j].ShardId
})
return true
}
-func (shards *EcVolumeShards) DeleteEcVolumeShard(ecVolumeShard *EcVolumeShard) bool {
+func (ev *EcVolume) DeleteEcVolumeShard(shardId ShardId) bool {
foundPosition := -1
- for i, s := range *shards {
- if s.ShardId == ecVolumeShard.ShardId {
+ for i, s := range ev.Shards {
+ if s.ShardId == shardId {
foundPosition = i
}
}
@@ -36,12 +38,12 @@ func (shards *EcVolumeShards) DeleteEcVolumeShard(ecVolumeShard *EcVolumeShard)
return false
}
- *shards = append((*shards)[:foundPosition], (*shards)[foundPosition+1:]...)
+ ev.Shards = append(ev.Shards[:foundPosition], ev.Shards[foundPosition+1:]...)
return true
}
-func (shards *EcVolumeShards) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
- for _, s := range *shards {
+func (ev *EcVolume) FindEcVolumeShard(shardId ShardId) (ecVolumeShard *EcVolumeShard, found bool) {
+ for _, s := range ev.Shards {
if s.ShardId == shardId {
return s, true
}
@@ -49,16 +51,16 @@ func (shards *EcVolumeShards) FindEcVolumeShard(shardId ShardId) (ecVolumeShard
return nil, false
}
-func (shards *EcVolumeShards) Close() {
- for _, s := range *shards {
+func (ev *EcVolume) Close() {
+ for _, s := range ev.Shards {
s.Close()
}
}
-func (shards *EcVolumeShards) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
+func (ev *EcVolume) ToVolumeEcShardInformationMessage() (messages []*master_pb.VolumeEcShardInformationMessage) {
prevVolumeId := needle.VolumeId(math.MaxUint32)
var m *master_pb.VolumeEcShardInformationMessage
- for _, s := range *shards {
+ for _, s := range ev.Shards {
if s.VolumeId != prevVolumeId {
m = &master_pb.VolumeEcShardInformationMessage{
Id: uint32(s.VolumeId),
@@ -72,9 +74,9 @@ func (shards *EcVolumeShards) ToVolumeEcShardInformationMessage() (messages []*m
return
}
-func (shards *EcVolumeShards) LocateEcShardNeedle(n *needle.Needle) (offset types.Offset, size uint32, intervals []Interval, err error) {
+func (ev *EcVolume) LocateEcShardNeedle(n *needle.Needle) (offset types.Offset, size uint32, intervals []Interval, err error) {
- shard := (*shards)[0]
+ shard := ev.Shards[0]
// find the needle from ecx file
offset, size, err = shard.findNeedleFromEcx(n.Id)
if err != nil {
diff --git a/weed/storage/store_ec.go b/weed/storage/store_ec.go
index ea08fc6bb..a75529fbd 100644
--- a/weed/storage/store_ec.go
+++ b/weed/storage/store_ec.go
@@ -16,11 +16,11 @@ import (
func (s *Store) CollectErasureCodingHeartbeat() *master_pb.Heartbeat {
var ecShardMessages []*master_pb.VolumeEcShardInformationMessage
for _, location := range s.Locations {
- location.ecShardsLock.RLock()
- for _, ecShards := range location.ecShards {
+ location.ecVolumesLock.RLock()
+ for _, ecShards := range location.ecVolumes {
ecShardMessages = append(ecShardMessages, ecShards.ToVolumeEcShardInformationMessage()...)
}
- location.ecShardsLock.RUnlock()
+ location.ecVolumesLock.RUnlock()
}
return &master_pb.Heartbeat{
@@ -82,9 +82,9 @@ func (s *Store) findEcShard(vid needle.VolumeId, shardId erasure_coding.ShardId)
return nil, false
}
-func (s *Store) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards, bool) {
+func (s *Store) FindEcVolume(vid needle.VolumeId) (*erasure_coding.EcVolume, bool) {
for _, location := range s.Locations {
- if s, found := location.HasEcShard(vid); found {
+ if s, found := location.FindEcVolume(vid); found {
return s, true
}
}
@@ -93,14 +93,14 @@ func (s *Store) HasEcShard(vid needle.VolumeId) (erasure_coding.EcVolumeShards,
func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *needle.Needle) (int, error) {
for _, location := range s.Locations {
- if localEcShards, found := location.HasEcShard(vid); found {
+ if localEcVolume, found := location.FindEcVolume(vid); found {
- offset, size, intervals, err := localEcShards.LocateEcShardNeedle(n)
+ offset, size, intervals, err := localEcVolume.LocateEcShardNeedle(n)
if err != nil {
return 0, err
}
- bytes, err := s.readEcShardIntervals(ctx, vid, localEcShards, intervals)
+ bytes, err := s.readEcShardIntervals(ctx, vid, localEcVolume, intervals)
if err != nil {
return 0, fmt.Errorf("ReadEcShardIntervals: %v", err)
}
@@ -118,14 +118,14 @@ func (s *Store) ReadEcShardNeedle(ctx context.Context, vid needle.VolumeId, n *n
return 0, fmt.Errorf("ec shard %d not found", vid)
}
-func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, localEcShards erasure_coding.EcVolumeShards, intervals []erasure_coding.Interval) (data []byte, err error) {
+func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, intervals []erasure_coding.Interval) (data []byte, err error) {
shardLocations, err := s.cachedLookupEcShardLocations(ctx, vid)
if err != nil {
return nil, fmt.Errorf("failed to locate shard via master grpc %s: %v", s.MasterGrpcAddress, err)
}
for i, interval := range intervals {
- if d, e := s.readOneEcShardInterval(ctx, vid, localEcShards, shardLocations, interval); e != nil {
+ if d, e := s.readOneEcShardInterval(ctx, vid, ecVolume, shardLocations, interval); e != nil {
return nil, e
} else {
if i == 0 {
@@ -138,10 +138,10 @@ func (s *Store) readEcShardIntervals(ctx context.Context, vid needle.VolumeId, l
return
}
-func (s *Store) readOneEcShardInterval(ctx context.Context, vid needle.VolumeId, localEcShards erasure_coding.EcVolumeShards, shardLocations map[erasure_coding.ShardId]string, interval erasure_coding.Interval) (data []byte, err error) {
+func (s *Store) readOneEcShardInterval(ctx context.Context, vid needle.VolumeId, ecVolume *erasure_coding.EcVolume, shardLocations map[erasure_coding.ShardId]string, interval erasure_coding.Interval) (data []byte, err error) {
shardId, actualOffset := interval.ToShardIdAndOffset(erasure_coding.ErasureCodingLargeBlockSize, erasure_coding.ErasureCodingSmallBlockSize)
data = make([]byte, interval.Size)
- if shard, found := localEcShards.FindEcVolumeShard(shardId); found {
+ if shard, found := ecVolume.FindEcVolumeShard(shardId); found {
if _, err = shard.ReadAt(data, actualOffset); err != nil {
return
}