aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--weed/filer/filechunk_group.go21
-rw-r--r--weed/filer/filechunk_section.go30
-rw-r--r--weed/filer/filechunk_section_test.go48
-rw-r--r--weed/filer/filechunks.go5
-rw-r--r--weed/filer/meta_aggregator.go12
-rw-r--r--weed/shell/command_cluster_ps.go21
6 files changed, 112 insertions, 25 deletions
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go
index 5dbf16a5c..de469f310 100644
--- a/weed/filer/filechunk_group.go
+++ b/weed/filer/filechunk_group.go
@@ -8,11 +8,10 @@ import (
)
type ChunkGroup struct {
- lookupFn wdclient.LookupFileIdFunctionType
- chunkCache chunk_cache.ChunkCache
- manifestChunks []*filer_pb.FileChunk
- sections map[SectionIndex]*FileChunkSection
- sectionsLock sync.RWMutex
+ lookupFn wdclient.LookupFileIdFunctionType
+ chunkCache chunk_cache.ChunkCache
+ sections map[SectionIndex]*FileChunkSection
+ sectionsLock sync.RWMutex
}
func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) {
@@ -69,6 +68,9 @@ func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) (
}
func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
+ group.sectionsLock.RLock()
+ defer group.sectionsLock.RUnlock()
+
var dataChunks []*filer_pb.FileChunk
for _, chunk := range chunks {
@@ -82,21 +84,24 @@ func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error {
return err
}
- group.manifestChunks = append(group.manifestChunks, chunk)
dataChunks = append(dataChunks, resolvedChunks...)
}
+ sections := make(map[SectionIndex]*FileChunkSection)
+
for _, chunk := range dataChunks {
sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize)
for si := sectionIndexStart; si < sectionIndexStop+1; si++ {
- section, found := group.sections[si]
+ section, found := sections[si]
if !found {
section = NewFileChunkSection(si)
- group.sections[si] = section
+ sections[si] = section
}
section.chunks = append(section.chunks, chunk)
}
}
+
+ group.sections = sections
return nil
}
diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go
index 5804c7160..350cc44d1 100644
--- a/weed/filer/filechunk_section.go
+++ b/weed/filer/filechunk_section.go
@@ -30,19 +30,12 @@ func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error {
section.chunks = append(section.chunks, chunk)
- if section.visibleIntervals != nil {
+ if section.visibleIntervals == nil {
+ section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
+ } else {
MergeIntoVisibles(section.visibleIntervals, start, stop, chunk)
garbageFileIds := FindGarbageChunks(section.visibleIntervals, start, stop)
- for _, garbageFileId := range garbageFileIds {
- length := len(section.chunks)
- for i, t := range section.chunks {
- if t.FileId == garbageFileId {
- section.chunks[i] = section.chunks[length-1]
- section.chunks = section.chunks[:length-1]
- break
- }
- }
- }
+ removeGarbageChunks(section, garbageFileIds)
}
if section.chunkViews != nil {
@@ -52,6 +45,21 @@ func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error {
return nil
}
+func removeGarbageChunks(section *FileChunkSection, garbageFileIds map[string]struct{}) {
+ for i := 0; i < len(section.chunks); {
+ t := section.chunks[i]
+ length := len(section.chunks)
+ if _, found := garbageFileIds[t.FileId]; found {
+ if i < length-1 {
+ section.chunks[i] = section.chunks[length-1]
+ }
+ section.chunks = section.chunks[:length-1]
+ } else {
+ i++
+ }
+ }
+}
+
func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) {
if section.visibleIntervals == nil {
section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize)
diff --git a/weed/filer/filechunk_section_test.go b/weed/filer/filechunk_section_test.go
new file mode 100644
index 000000000..e4536540b
--- /dev/null
+++ b/weed/filer/filechunk_section_test.go
@@ -0,0 +1,48 @@
+package filer
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "testing"
+)
+
+func Test_removeGarbageChunks(t *testing.T) {
+ section := NewFileChunkSection(0)
+ section.addChunk(&filer_pb.FileChunk{
+ FileId: "0",
+ Offset: 0,
+ Size: 1,
+ ModifiedTsNs: 0,
+ })
+ section.addChunk(&filer_pb.FileChunk{
+ FileId: "1",
+ Offset: 1,
+ Size: 1,
+ ModifiedTsNs: 1,
+ })
+ section.addChunk(&filer_pb.FileChunk{
+ FileId: "2",
+ Offset: 2,
+ Size: 1,
+ ModifiedTsNs: 2,
+ })
+ section.addChunk(&filer_pb.FileChunk{
+ FileId: "3",
+ Offset: 3,
+ Size: 1,
+ ModifiedTsNs: 3,
+ })
+ section.addChunk(&filer_pb.FileChunk{
+ FileId: "4",
+ Offset: 4,
+ Size: 1,
+ ModifiedTsNs: 4,
+ })
+ garbageFileIds := make(map[string]struct{})
+ garbageFileIds["0"] = struct{}{}
+ garbageFileIds["2"] = struct{}{}
+ garbageFileIds["4"] = struct{}{}
+ removeGarbageChunks(section, garbageFileIds)
+ if len(section.chunks) != 2 {
+ t.Errorf("remove chunk 2 failed")
+ }
+}
diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go
index 480478fd7..7c8bb2fe1 100644
--- a/weed/filer/filechunks.go
+++ b/weed/filer/filechunks.go
@@ -86,12 +86,13 @@ func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*f
return compacted, garbage
}
-func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, stop int64) (garbageFileId []string) {
+func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, stop int64) (garbageFileIds map[string]struct{}) {
+ garbageFileIds = make(map[string]struct{})
for x := visibles.Front(); x != nil; x = x.Next {
interval := x.Value
offset := interval.start - interval.offsetInChunk
if start <= offset && offset+int64(interval.chunkSize) <= stop {
- garbageFileId = append(garbageFileId, interval.fileId)
+ garbageFileIds[interval.fileId] = struct{}{}
}
}
return
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go
index fbc163442..8cd7d5bf9 100644
--- a/weed/filer/meta_aggregator.go
+++ b/weed/filer/meta_aggregator.go
@@ -243,10 +243,15 @@ const (
MetaOffsetPrefix = "Meta"
)
-func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
-
+func GetPeerMetaOffsetKey(peerSignature int32) []byte {
key := []byte(MetaOffsetPrefix + "xxxx")
util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
+ return key
+}
+
+func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) {
+
+ key := GetPeerMetaOffsetKey(peerSignature)
value, err := f.Store.KvGet(context.Background(), key)
@@ -263,8 +268,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignat
func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) {
- key := []byte(MetaOffsetPrefix + "xxxx")
- util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature))
+ key := GetPeerMetaOffsetKey(peerSignature)
value := make([]byte, 8)
util.Uint64toBytes(value, uint64(lastTsNs))
diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go
index 5c495b2e2..fc6725fad 100644
--- a/weed/shell/command_cluster_ps.go
+++ b/weed/shell/command_cluster_ps.go
@@ -5,10 +5,13 @@ import (
"flag"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/cluster"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "time"
"github.com/seaweedfs/seaweedfs/weed/pb/master_pb"
)
@@ -92,6 +95,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
}
}
+ filerSignatures := make(map[*master_pb.ListClusterNodesResponse_ClusterNode]int32)
fmt.Fprintf(writer, "* filers %d\n", len(filerNodes))
for _, node := range filerNodes {
fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version)
@@ -108,12 +112,29 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W
fmt.Fprintf(writer, " filer group: %s\n", resp.FilerGroup)
}
fmt.Fprintf(writer, " signature: %d\n", resp.Signature)
+ filerSignatures[node] = resp.Signature
} else {
fmt.Fprintf(writer, " failed to connect: %v\n", err)
}
return err
})
}
+ for _, node := range filerNodes {
+ pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error {
+ fmt.Fprintf(writer, "* filer %s metadata sync time\n", node.Address)
+ selfSignature := filerSignatures[node]
+ for peer, peerSignature := range filerSignatures {
+ if selfSignature == peerSignature {
+ continue
+ }
+ if resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: filer.GetPeerMetaOffsetKey(peerSignature)}); err == nil && len(resp.Value) == 8 {
+ lastTsNs := int64(util.BytesToUint64(resp.Value))
+ fmt.Fprintf(writer, " %s: %v\n", peer.Address, time.Unix(0, lastTsNs).UTC())
+ }
+ }
+ return nil
+ })
+ }
// collect volume servers
var volumeServers []pb.ServerAddress