diff options
| -rw-r--r-- | weed/filer/filechunk_group.go | 21 | ||||
| -rw-r--r-- | weed/filer/filechunk_section.go | 30 | ||||
| -rw-r--r-- | weed/filer/filechunk_section_test.go | 48 | ||||
| -rw-r--r-- | weed/filer/filechunks.go | 5 | ||||
| -rw-r--r-- | weed/filer/meta_aggregator.go | 12 | ||||
| -rw-r--r-- | weed/shell/command_cluster_ps.go | 21 |
6 files changed, 112 insertions, 25 deletions
diff --git a/weed/filer/filechunk_group.go b/weed/filer/filechunk_group.go index 5dbf16a5c..de469f310 100644 --- a/weed/filer/filechunk_group.go +++ b/weed/filer/filechunk_group.go @@ -8,11 +8,10 @@ import ( ) type ChunkGroup struct { - lookupFn wdclient.LookupFileIdFunctionType - chunkCache chunk_cache.ChunkCache - manifestChunks []*filer_pb.FileChunk - sections map[SectionIndex]*FileChunkSection - sectionsLock sync.RWMutex + lookupFn wdclient.LookupFileIdFunctionType + chunkCache chunk_cache.ChunkCache + sections map[SectionIndex]*FileChunkSection + sectionsLock sync.RWMutex } func NewChunkGroup(lookupFn wdclient.LookupFileIdFunctionType, chunkCache chunk_cache.ChunkCache, chunks []*filer_pb.FileChunk) (*ChunkGroup, error) { @@ -69,6 +68,9 @@ func (group *ChunkGroup) ReadDataAt(fileSize int64, buff []byte, offset int64) ( } func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { + group.sectionsLock.RLock() + defer group.sectionsLock.RUnlock() + var dataChunks []*filer_pb.FileChunk for _, chunk := range chunks { @@ -82,21 +84,24 @@ func (group *ChunkGroup) SetChunks(chunks []*filer_pb.FileChunk) error { return err } - group.manifestChunks = append(group.manifestChunks, chunk) dataChunks = append(dataChunks, resolvedChunks...) } + sections := make(map[SectionIndex]*FileChunkSection) + for _, chunk := range dataChunks { sectionIndexStart, sectionIndexStop := SectionIndex(chunk.Offset/SectionSize), SectionIndex((chunk.Offset+int64(chunk.Size))/SectionSize) for si := sectionIndexStart; si < sectionIndexStop+1; si++ { - section, found := group.sections[si] + section, found := sections[si] if !found { section = NewFileChunkSection(si) - group.sections[si] = section + sections[si] = section } section.chunks = append(section.chunks, chunk) } } + + group.sections = sections return nil } diff --git a/weed/filer/filechunk_section.go b/weed/filer/filechunk_section.go index 5804c7160..350cc44d1 100644 --- a/weed/filer/filechunk_section.go +++ b/weed/filer/filechunk_section.go @@ -30,19 +30,12 @@ func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error { section.chunks = append(section.chunks, chunk) - if section.visibleIntervals != nil { + if section.visibleIntervals == nil { + section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize) + } else { MergeIntoVisibles(section.visibleIntervals, start, stop, chunk) garbageFileIds := FindGarbageChunks(section.visibleIntervals, start, stop) - for _, garbageFileId := range garbageFileIds { - length := len(section.chunks) - for i, t := range section.chunks { - if t.FileId == garbageFileId { - section.chunks[i] = section.chunks[length-1] - section.chunks = section.chunks[:length-1] - break - } - } - } + removeGarbageChunks(section, garbageFileIds) } if section.chunkViews != nil { @@ -52,6 +45,21 @@ func (section *FileChunkSection) addChunk(chunk *filer_pb.FileChunk) error { return nil } +func removeGarbageChunks(section *FileChunkSection, garbageFileIds map[string]struct{}) { + for i := 0; i < len(section.chunks); { + t := section.chunks[i] + length := len(section.chunks) + if _, found := garbageFileIds[t.FileId]; found { + if i < length-1 { + section.chunks[i] = section.chunks[length-1] + } + section.chunks = section.chunks[:length-1] + } else { + i++ + } + } +} + func (section *FileChunkSection) setupForRead(group *ChunkGroup, fileSize int64) { if section.visibleIntervals == nil { section.visibleIntervals = readResolvedChunks(section.chunks, int64(section.sectionIndex)*SectionSize, (int64(section.sectionIndex)+1)*SectionSize) diff --git a/weed/filer/filechunk_section_test.go b/weed/filer/filechunk_section_test.go new file mode 100644 index 000000000..e4536540b --- /dev/null +++ b/weed/filer/filechunk_section_test.go @@ -0,0 +1,48 @@ +package filer + +import ( + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "testing" +) + +func Test_removeGarbageChunks(t *testing.T) { + section := NewFileChunkSection(0) + section.addChunk(&filer_pb.FileChunk{ + FileId: "0", + Offset: 0, + Size: 1, + ModifiedTsNs: 0, + }) + section.addChunk(&filer_pb.FileChunk{ + FileId: "1", + Offset: 1, + Size: 1, + ModifiedTsNs: 1, + }) + section.addChunk(&filer_pb.FileChunk{ + FileId: "2", + Offset: 2, + Size: 1, + ModifiedTsNs: 2, + }) + section.addChunk(&filer_pb.FileChunk{ + FileId: "3", + Offset: 3, + Size: 1, + ModifiedTsNs: 3, + }) + section.addChunk(&filer_pb.FileChunk{ + FileId: "4", + Offset: 4, + Size: 1, + ModifiedTsNs: 4, + }) + garbageFileIds := make(map[string]struct{}) + garbageFileIds["0"] = struct{}{} + garbageFileIds["2"] = struct{}{} + garbageFileIds["4"] = struct{}{} + removeGarbageChunks(section, garbageFileIds) + if len(section.chunks) != 2 { + t.Errorf("remove chunk 2 failed") + } +} diff --git a/weed/filer/filechunks.go b/weed/filer/filechunks.go index 480478fd7..7c8bb2fe1 100644 --- a/weed/filer/filechunks.go +++ b/weed/filer/filechunks.go @@ -86,12 +86,13 @@ func SeparateGarbageChunks(visibles *IntervalList[*VisibleInterval], chunks []*f return compacted, garbage } -func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, stop int64) (garbageFileId []string) { +func FindGarbageChunks(visibles *IntervalList[*VisibleInterval], start int64, stop int64) (garbageFileIds map[string]struct{}) { + garbageFileIds = make(map[string]struct{}) for x := visibles.Front(); x != nil; x = x.Next { interval := x.Value offset := interval.start - interval.offsetInChunk if start <= offset && offset+int64(interval.chunkSize) <= stop { - garbageFileId = append(garbageFileId, interval.fileId) + garbageFileIds[interval.fileId] = struct{}{} } } return diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index fbc163442..8cd7d5bf9 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -243,10 +243,15 @@ const ( MetaOffsetPrefix = "Meta" ) -func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) { - +func GetPeerMetaOffsetKey(peerSignature int32) []byte { key := []byte(MetaOffsetPrefix + "xxxx") util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) + return key +} + +func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignature int32) (lastTsNs int64, err error) { + + key := GetPeerMetaOffsetKey(peerSignature) value, err := f.Store.KvGet(context.Background(), key) @@ -263,8 +268,7 @@ func (ma *MetaAggregator) readOffset(f *Filer, peer pb.ServerAddress, peerSignat func (ma *MetaAggregator) updateOffset(f *Filer, peer pb.ServerAddress, peerSignature int32, lastTsNs int64) (err error) { - key := []byte(MetaOffsetPrefix + "xxxx") - util.Uint32toBytes(key[len(MetaOffsetPrefix):], uint32(peerSignature)) + key := GetPeerMetaOffsetKey(peerSignature) value := make([]byte, 8) util.Uint64toBytes(value, uint64(lastTsNs)) diff --git a/weed/shell/command_cluster_ps.go b/weed/shell/command_cluster_ps.go index 5c495b2e2..fc6725fad 100644 --- a/weed/shell/command_cluster_ps.go +++ b/weed/shell/command_cluster_ps.go @@ -5,10 +5,13 @@ import ( "flag" "fmt" "github.com/seaweedfs/seaweedfs/weed/cluster" + "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/pb" "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" "github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb" + "github.com/seaweedfs/seaweedfs/weed/util" "io" + "time" "github.com/seaweedfs/seaweedfs/weed/pb/master_pb" ) @@ -92,6 +95,7 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W } } + filerSignatures := make(map[*master_pb.ListClusterNodesResponse_ClusterNode]int32) fmt.Fprintf(writer, "* filers %d\n", len(filerNodes)) for _, node := range filerNodes { fmt.Fprintf(writer, " * %s (%v)\n", node.Address, node.Version) @@ -108,12 +112,29 @@ func (c *commandClusterPs) Do(args []string, commandEnv *CommandEnv, writer io.W fmt.Fprintf(writer, " filer group: %s\n", resp.FilerGroup) } fmt.Fprintf(writer, " signature: %d\n", resp.Signature) + filerSignatures[node] = resp.Signature } else { fmt.Fprintf(writer, " failed to connect: %v\n", err) } return err }) } + for _, node := range filerNodes { + pb.WithFilerClient(false, pb.ServerAddress(node.Address), commandEnv.option.GrpcDialOption, func(client filer_pb.SeaweedFilerClient) error { + fmt.Fprintf(writer, "* filer %s metadata sync time\n", node.Address) + selfSignature := filerSignatures[node] + for peer, peerSignature := range filerSignatures { + if selfSignature == peerSignature { + continue + } + if resp, err := client.KvGet(context.Background(), &filer_pb.KvGetRequest{Key: filer.GetPeerMetaOffsetKey(peerSignature)}); err == nil && len(resp.Value) == 8 { + lastTsNs := int64(util.BytesToUint64(resp.Value)) + fmt.Fprintf(writer, " %s: %v\n", peer.Address, time.Unix(0, lastTsNs).UTC()) + } + } + return nil + }) + } // collect volume servers var volumeServers []pb.ServerAddress |
