diff options
| author | hilimd <68371223+hilimd@users.noreply.github.com> | 2020-09-25 09:45:56 +0800 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2020-09-25 09:45:56 +0800 |
| commit | 76e24a5660a2192603b7d6d84aef1924ab95cb94 (patch) | |
| tree | 72e1b320c3487aa5f6cb3cd5fc849bfd94108ec8 /weed/filer | |
| parent | 48c578410fea2128f81356250b2cd9d56074d878 (diff) | |
| parent | 043b0631369bec00b33eb53cdf2cdef3eced006c (diff) | |
| download | seaweedfs-76e24a5660a2192603b7d6d84aef1924ab95cb94.tar.xz seaweedfs-76e24a5660a2192603b7d6d84aef1924ab95cb94.zip | |
Merge pull request #20 from chrislusf/master
sync
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/entry.go | 34 | ||||
| -rw-r--r-- | weed/filer/entry_codec.go | 22 | ||||
| -rw-r--r-- | weed/filer/filechunk_manifest.go | 2 | ||||
| -rw-r--r-- | weed/filer/filer.go | 10 | ||||
| -rw-r--r-- | weed/filer/filer_delete_entry.go | 36 | ||||
| -rw-r--r-- | weed/filer/filerstore.go | 33 | ||||
| -rw-r--r-- | weed/filer/filerstore_hardlink.go | 96 | ||||
| -rw-r--r-- | weed/filer/stream.go | 6 |
8 files changed, 213 insertions, 26 deletions
diff --git a/weed/filer/entry.go b/weed/filer/entry.go index 4a73de19a..421e51432 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -37,6 +37,9 @@ type Entry struct { // the following is for files Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"` + + HardLinkId HardLinkId + HardLinkCounter int32 } func (entry *Entry) Size() uint64 { @@ -56,11 +59,13 @@ func (entry *Entry) ToProtoEntry() *filer_pb.Entry { return nil } return &filer_pb.Entry{ - Name: entry.FullPath.Name(), - IsDirectory: entry.IsDirectory(), - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, + Name: entry.FullPath.Name(), + IsDirectory: entry.IsDirectory(), + Attributes: EntryAttributeToPb(entry), + Chunks: entry.Chunks, + Extended: entry.Extended, + HardLinkId: entry.HardLinkId, + HardLinkCounter: entry.HardLinkCounter, } } @@ -75,11 +80,24 @@ func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { } } +func (entry *Entry) Clone() *Entry { + return &Entry{ + FullPath: entry.FullPath, + Attr: entry.Attr, + Chunks: entry.Chunks, + Extended: entry.Extended, + HardLinkId: entry.HardLinkId, + HardLinkCounter: entry.HardLinkCounter, + } +} + func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry { return &Entry{ - FullPath: util.NewFullPath(dir, entry.Name), - Attr: PbToEntryAttribute(entry.Attributes), - Chunks: entry.Chunks, + FullPath: util.NewFullPath(dir, entry.Name), + Attr: PbToEntryAttribute(entry.Attributes), + Chunks: entry.Chunks, + HardLinkId: HardLinkId(entry.HardLinkId), + HardLinkCounter: entry.HardLinkCounter, } } diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go index fb6448b30..884fb2670 100644 --- a/weed/filer/entry_codec.go +++ b/weed/filer/entry_codec.go @@ -13,9 +13,11 @@ import ( func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) { message := &filer_pb.Entry{ - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, + Attributes: EntryAttributeToPb(entry), + Chunks: entry.Chunks, + Extended: entry.Extended, + HardLinkId: entry.HardLinkId, + HardLinkCounter: entry.HardLinkCounter, } return proto.Marshal(message) } @@ -34,6 +36,9 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error { entry.Chunks = message.Chunks + entry.HardLinkId = message.HardLinkId + entry.HardLinkCounter = message.HardLinkCounter + return nil } @@ -61,6 +66,10 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t := Attr{} + if attr == nil { + return t + } + t.Crtime = time.Unix(attr.Crtime, 0) t.Mtime = time.Unix(attr.Mtime, 0) t.Mode = os.FileMode(attr.FileMode) @@ -106,6 +115,13 @@ func EqualEntry(a, b *Entry) bool { return false } } + + if !bytes.Equal(a.HardLinkId, b.HardLinkId) { + return false + } + if a.HardLinkCounter != b.HardLinkCounter { + return false + } return true } diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index e84cf21e5..37b172357 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -90,7 +90,7 @@ func fetchChunk(lookupFileIdFn LookupFileIdFunctionType, fileId string, cipherKe return nil, err } var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString+"?readDeleted=true", cipherKey, isGzipped, true, 0, 0, func(data []byte) { + err = util.ReadUrlAsStream(urlString, cipherKey, isGzipped, true, 0, 0, func(data []byte) { buffer.Write(data) }) if err != nil { diff --git a/weed/filer/filer.go b/weed/filer/filer.go index acbe63486..5b0698211 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -28,7 +28,7 @@ var ( ) type Filer struct { - Store *FilerStoreWrapper + Store VirtualFilerStore MasterClient *wdclient.MasterClient fileIdDeletionQueue *util.UnboundedQueue GrpcDialOption grpc.DialOption @@ -314,3 +314,11 @@ func (f *Filer) Shutdown() { f.LocalMetaLogBuffer.Shutdown() f.Store.Shutdown() } + +func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) { + for _, hardLinkId := range hardLinkIds { + if err := f.Store.DeleteHardLink(context.Background(), hardLinkId); err != nil { + glog.Errorf("delete hard link id %d : %v", hardLinkId, err) + } + } +} diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 379156321..6c9ff56d3 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -10,6 +10,8 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +type HardLinkId []byte + func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) { if p == "/" { return nil @@ -23,16 +25,19 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR isCollection := f.isBucket(entry) var chunks []*filer_pb.FileChunk + var hardLinkIds []HardLinkId chunks = append(chunks, entry.Chunks...) if entry.IsDirectory() { // delete the folder children, not including the folder itself var dirChunks []*filer_pb.FileChunk - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures) + var dirHardLinkIds []HardLinkId + dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures) if err != nil { glog.V(0).Infof("delete directory %s: %v", p, err) return fmt.Errorf("delete directory %s: %v", p, err) } chunks = append(chunks, dirChunks...) + hardLinkIds = append(hardLinkIds, dirHardLinkIds...) } // delete the file or folder @@ -44,6 +49,12 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR if shouldDeleteChunks && !isCollection { go f.DeleteChunks(chunks) } + // A case not handled: + // what if the chunk is in a different collection? + if shouldDeleteChunks { + f.maybeDeleteHardLinks(hardLinkIds) + } + if isCollection { collectionName := entry.Name() f.doDeleteCollection(collectionName) @@ -53,7 +64,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR return nil } -func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, err error) { +func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) { lastFileName := "" includeLastFile := false @@ -61,26 +72,33 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "") if err != nil { glog.Errorf("list folder %s: %v", entry.FullPath, err) - return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) + return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) } if lastFileName == "" && !isRecursive && len(entries) > 0 { // only for first iteration in the loop glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) - return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) + return nil, nil,fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) } for _, sub := range entries { lastFileName = sub.Name() var dirChunks []*filer_pb.FileChunk + var dirHardLinkIds []HardLinkId if sub.IsDirectory() { - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil) + dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil) chunks = append(chunks, dirChunks...) + hardlinkIds = append(hardlinkIds, dirHardLinkIds...) } else { f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil) - chunks = append(chunks, sub.Chunks...) + if len(sub.HardLinkId) != 0 { + // hard link chunk data are deleted separately + hardlinkIds = append(hardlinkIds, sub.HardLinkId) + } else { + chunks = append(chunks, sub.Chunks...) + } } if err != nil && !ignoreRecursiveError { - return nil, err + return nil, nil, err } } @@ -92,12 +110,12 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks) if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { - return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) + return nil, nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) } f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) - return chunks, nil + return chunks, hardlinkIds, nil } func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) { diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index d313b7ba3..11e30878d 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -24,7 +24,7 @@ type FilerStore interface { Initialize(configuration util.Configuration, prefix string) error InsertEntry(context.Context, *Entry) error UpdateEntry(context.Context, *Entry) (err error) - // err == filer2.ErrNotFound if not found + // err == filer_pb.ErrNotFound if not found FindEntry(context.Context, util.FullPath) (entry *Entry, err error) DeleteEntry(context.Context, util.FullPath) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error) @@ -42,6 +42,11 @@ type FilerStore interface { Shutdown() } +type VirtualFilerStore interface { + FilerStore + DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error +} + type FilerStoreWrapper struct { ActualStore FilerStore } @@ -74,6 +79,11 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err if entry.Mime == "application/octet-stream" { entry.Mime = "" } + + if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { + return err + } + return fsw.ActualStore.InsertEntry(ctx, entry) } @@ -88,6 +98,11 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err if entry.Mime == "application/octet-stream" { entry.Mime = "" } + + if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil { + return err + } + return fsw.ActualStore.UpdateEntry(ctx, entry) } @@ -102,6 +117,9 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) ( if err != nil { return nil, err } + + fsw.maybeReadHardLink(ctx, entry) + filer_pb.AfterEntryDeserialization(entry.Chunks) return } @@ -113,6 +131,17 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) }() + existingEntry, findErr := fsw.FindEntry(ctx, fp) + if findErr == filer_pb.ErrNotFound { + return nil + } + if len(existingEntry.HardLinkId) != 0 { + // remove hard link + if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { + return err + } + } + return fsw.ActualStore.DeleteEntry(ctx, fp) } @@ -138,6 +167,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath return nil, err } for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) } return entries, err @@ -157,6 +187,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, return nil, err } for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) } return entries, nil diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go new file mode 100644 index 000000000..0fbf8310e --- /dev/null +++ b/weed/filer/filerstore_hardlink.go @@ -0,0 +1,96 @@ +package filer + +import ( + "bytes" + "context" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" +) + +func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error { + if len(entry.HardLinkId) == 0 { + return nil + } + // handle hard links + if err := fsw.setHardLink(ctx, entry); err != nil { + return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err) + } + + // check what is existing entry + existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath) + if err != nil && err != filer_pb.ErrNotFound { + return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err) + } + + // remove old hard link + if err == nil && len(existingEntry.HardLinkId) != 0 && bytes.Compare(existingEntry.HardLinkId, entry.HardLinkId) != 0 { + if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { + return err + } + } + return nil +} + +func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) error { + if len(entry.HardLinkId) == 0 { + return nil + } + key := entry.HardLinkId + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) +} + +func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error { + if len(entry.HardLinkId) == 0 { + return nil + } + key := entry.HardLinkId + + value, err := fsw.KvGet(ctx, key) + if err != nil { + glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) + return err + } + + if err = entry.DecodeAttributesAndChunks(value); err != nil { + glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) + return err + } + + return nil +} + +func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error { + key := hardLinkId + value, err := fsw.KvGet(ctx, key) + if err == ErrKvNotFound { + return nil + } + if err != nil { + return err + } + + entry := &Entry{} + if err = entry.DecodeAttributesAndChunks(value); err != nil { + return err + } + + entry.HardLinkCounter-- + if entry.HardLinkCounter <= 0 { + return fsw.KvDelete(ctx, key) + } + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) + +} diff --git a/weed/filer/stream.go b/weed/filer/stream.go index 416359ebf..dc6e414ca 100644 --- a/weed/filer/stream.go +++ b/weed/filer/stream.go @@ -32,7 +32,7 @@ func StreamContent(masterClient *wdclient.MasterClient, w io.Writer, chunks []*f for _, chunkView := range chunkViews { urlString := fileId2Url[chunkView.FileId] - err := util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + err := util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { w.Write(data) }) if err != nil { @@ -63,7 +63,7 @@ func ReadAll(masterClient *wdclient.MasterClient, chunks []*filer_pb.FileChunk) glog.V(1).Infof("operation LookupFileId %s failed, err: %v", chunkView.FileId, err) return nil, err } - err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if err != nil { @@ -175,7 +175,7 @@ func (c *ChunkStreamReader) fetchChunkToBuffer(chunkView *ChunkView) error { return err } var buffer bytes.Buffer - err = util.ReadUrlAsStream(urlString+"?readDeleted=true", chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { + err = util.ReadUrlAsStream(urlString, chunkView.CipherKey, chunkView.IsGzipped, chunkView.IsFullChunk(), chunkView.Offset, int(chunkView.Size), func(data []byte) { buffer.Write(data) }) if err != nil { |
