diff options
| author | Chris Lu <chris.lu@gmail.com> | 2020-09-24 03:06:44 -0700 |
|---|---|---|
| committer | Chris Lu <chris.lu@gmail.com> | 2020-09-24 03:06:48 -0700 |
| commit | 5e239afdfc64ef39c5d4f41ec16410e726614eee (patch) | |
| tree | 8f1088f327777603dbdcb1d137ddc26efd18dfa6 /weed/filer | |
| parent | c7d7b1a0f6e396e481c184c419e89675435f0e18 (diff) | |
| download | seaweedfs-5e239afdfc64ef39c5d4f41ec16410e726614eee.tar.xz seaweedfs-5e239afdfc64ef39c5d4f41ec16410e726614eee.zip | |
hardlink works now
Diffstat (limited to 'weed/filer')
| -rw-r--r-- | weed/filer/entry.go | 34 | ||||
| -rw-r--r-- | weed/filer/entry_codec.go | 22 | ||||
| -rw-r--r-- | weed/filer/filer.go | 10 | ||||
| -rw-r--r-- | weed/filer/filer_delete_entry.go | 41 | ||||
| -rw-r--r-- | weed/filer/filerstore.go | 226 |
5 files changed, 311 insertions, 22 deletions
diff --git a/weed/filer/entry.go b/weed/filer/entry.go index 4a73de19a..45ede0e8e 100644 --- a/weed/filer/entry.go +++ b/weed/filer/entry.go @@ -37,6 +37,9 @@ type Entry struct { // the following is for files Chunks []*filer_pb.FileChunk `json:"chunks,omitempty"` + + HardLinkId HardLinkId + HardLinkCounter int32 } func (entry *Entry) Size() uint64 { @@ -56,11 +59,13 @@ func (entry *Entry) ToProtoEntry() *filer_pb.Entry { return nil } return &filer_pb.Entry{ - Name: entry.FullPath.Name(), - IsDirectory: entry.IsDirectory(), - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, + Name: entry.FullPath.Name(), + IsDirectory: entry.IsDirectory(), + Attributes: EntryAttributeToPb(entry), + Chunks: entry.Chunks, + Extended: entry.Extended, + HardLinkId: int64(entry.HardLinkId), + HardLinkCounter: entry.HardLinkCounter, } } @@ -75,11 +80,24 @@ func (entry *Entry) ToProtoFullEntry() *filer_pb.FullEntry { } } +func (entry *Entry) Clone() *Entry { + return &Entry{ + FullPath: entry.FullPath, + Attr: entry.Attr, + Chunks: entry.Chunks, + Extended: entry.Extended, + HardLinkId: entry.HardLinkId, + HardLinkCounter: entry.HardLinkCounter, + } +} + func FromPbEntry(dir string, entry *filer_pb.Entry) *Entry { return &Entry{ - FullPath: util.NewFullPath(dir, entry.Name), - Attr: PbToEntryAttribute(entry.Attributes), - Chunks: entry.Chunks, + FullPath: util.NewFullPath(dir, entry.Name), + Attr: PbToEntryAttribute(entry.Attributes), + Chunks: entry.Chunks, + HardLinkId: HardLinkId(entry.HardLinkId), + HardLinkCounter: entry.HardLinkCounter, } } diff --git a/weed/filer/entry_codec.go b/weed/filer/entry_codec.go index fb6448b30..21531ad7a 100644 --- a/weed/filer/entry_codec.go +++ b/weed/filer/entry_codec.go @@ -13,9 +13,11 @@ import ( func (entry *Entry) EncodeAttributesAndChunks() ([]byte, error) { message := &filer_pb.Entry{ - Attributes: EntryAttributeToPb(entry), - Chunks: entry.Chunks, - Extended: entry.Extended, + Attributes: EntryAttributeToPb(entry), + Chunks: entry.Chunks, + Extended: entry.Extended, + HardLinkId: int64(entry.HardLinkId), + HardLinkCounter: entry.HardLinkCounter, } return proto.Marshal(message) } @@ -34,6 +36,9 @@ func (entry *Entry) DecodeAttributesAndChunks(blob []byte) error { entry.Chunks = message.Chunks + entry.HardLinkId = HardLinkId(message.HardLinkId) + entry.HardLinkCounter = message.HardLinkCounter + return nil } @@ -61,6 +66,10 @@ func PbToEntryAttribute(attr *filer_pb.FuseAttributes) Attr { t := Attr{} + if attr == nil { + return t + } + t.Crtime = time.Unix(attr.Crtime, 0) t.Mtime = time.Unix(attr.Mtime, 0) t.Mode = os.FileMode(attr.FileMode) @@ -106,6 +115,13 @@ func EqualEntry(a, b *Entry) bool { return false } } + + if a.HardLinkId != b.HardLinkId { + return false + } + if a.HardLinkCounter != b.HardLinkCounter { + return false + } return true } diff --git a/weed/filer/filer.go b/weed/filer/filer.go index acbe63486..5b0698211 100644 --- a/weed/filer/filer.go +++ b/weed/filer/filer.go @@ -28,7 +28,7 @@ var ( ) type Filer struct { - Store *FilerStoreWrapper + Store VirtualFilerStore MasterClient *wdclient.MasterClient fileIdDeletionQueue *util.UnboundedQueue GrpcDialOption grpc.DialOption @@ -314,3 +314,11 @@ func (f *Filer) Shutdown() { f.LocalMetaLogBuffer.Shutdown() f.Store.Shutdown() } + +func (f *Filer) maybeDeleteHardLinks(hardLinkIds []HardLinkId) { + for _, hardLinkId := range hardLinkIds { + if err := f.Store.DeleteHardLink(context.Background(), hardLinkId); err != nil { + glog.Errorf("delete hard link id %d : %v", hardLinkId, err) + } + } +} diff --git a/weed/filer/filer_delete_entry.go b/weed/filer/filer_delete_entry.go index 379156321..693b93f8b 100644 --- a/weed/filer/filer_delete_entry.go +++ b/weed/filer/filer_delete_entry.go @@ -10,6 +10,13 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) +type HardLinkId int64 +func (hardLinkId HardLinkId) Key() []byte{ + bytes := make([]byte, 8) + util.Uint64toBytes(bytes, uint64(hardLinkId)) + return bytes +} + func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (err error) { if p == "/" { return nil @@ -23,16 +30,19 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR isCollection := f.isBucket(entry) var chunks []*filer_pb.FileChunk + var hardLinkIds []HardLinkId chunks = append(chunks, entry.Chunks...) if entry.IsDirectory() { // delete the folder children, not including the folder itself var dirChunks []*filer_pb.FileChunk - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures) + var dirHardLinkIds []HardLinkId + dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks && !isCollection, isFromOtherCluster, signatures) if err != nil { glog.V(0).Infof("delete directory %s: %v", p, err) return fmt.Errorf("delete directory %s: %v", p, err) } chunks = append(chunks, dirChunks...) + hardLinkIds = append(hardLinkIds, dirHardLinkIds...) } // delete the file or folder @@ -44,6 +54,12 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR if shouldDeleteChunks && !isCollection { go f.DeleteChunks(chunks) } + // A case not handled: + // what if the chunk is in a different collection? + if shouldDeleteChunks { + f.maybeDeleteHardLinks(hardLinkIds) + } + if isCollection { collectionName := entry.Name() f.doDeleteCollection(collectionName) @@ -53,7 +69,7 @@ func (f *Filer) DeleteEntryMetaAndData(ctx context.Context, p util.FullPath, isR return nil } -func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, err error) { +func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry, isRecursive, ignoreRecursiveError, shouldDeleteChunks, isFromOtherCluster bool, signatures []int32) (chunks []*filer_pb.FileChunk, hardlinkIds []HardLinkId, err error) { lastFileName := "" includeLastFile := false @@ -61,26 +77,33 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry entries, err := f.ListDirectoryEntries(ctx, entry.FullPath, lastFileName, includeLastFile, PaginationSize, "") if err != nil { glog.Errorf("list folder %s: %v", entry.FullPath, err) - return nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) + return nil, nil, fmt.Errorf("list folder %s: %v", entry.FullPath, err) } if lastFileName == "" && !isRecursive && len(entries) > 0 { // only for first iteration in the loop glog.Errorf("deleting a folder %s has children: %+v ...", entry.FullPath, entries[0].Name()) - return nil, fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) + return nil, nil,fmt.Errorf("fail to delete non-empty folder: %s", entry.FullPath) } for _, sub := range entries { lastFileName = sub.Name() var dirChunks []*filer_pb.FileChunk + var dirHardLinkIds []HardLinkId if sub.IsDirectory() { - dirChunks, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil) + dirChunks, dirHardLinkIds, err = f.doBatchDeleteFolderMetaAndData(ctx, sub, isRecursive, ignoreRecursiveError, shouldDeleteChunks, false, nil) chunks = append(chunks, dirChunks...) + hardlinkIds = append(hardlinkIds, dirHardLinkIds...) } else { f.NotifyUpdateEvent(ctx, sub, nil, shouldDeleteChunks, isFromOtherCluster, nil) - chunks = append(chunks, sub.Chunks...) + if sub.HardLinkId != 0 { + // hard link chunk data are deleted separately + hardlinkIds = append(hardlinkIds, sub.HardLinkId) + } else { + chunks = append(chunks, sub.Chunks...) + } } if err != nil && !ignoreRecursiveError { - return nil, err + return nil, nil, err } } @@ -92,12 +115,12 @@ func (f *Filer) doBatchDeleteFolderMetaAndData(ctx context.Context, entry *Entry glog.V(3).Infof("deleting directory %v delete %d chunks: %v", entry.FullPath, len(chunks), shouldDeleteChunks) if storeDeletionErr := f.Store.DeleteFolderChildren(ctx, entry.FullPath); storeDeletionErr != nil { - return nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) + return nil, nil, fmt.Errorf("filer store delete: %v", storeDeletionErr) } f.NotifyUpdateEvent(ctx, entry, nil, shouldDeleteChunks, isFromOtherCluster, signatures) - return chunks, nil + return chunks, hardlinkIds, nil } func (f *Filer) doDeleteEntryMetaAndData(ctx context.Context, entry *Entry, shouldDeleteChunks bool, isFromOtherCluster bool, signatures []int32) (err error) { diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index d313b7ba3..7dc778562 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -3,6 +3,8 @@ package filer import ( "context" "errors" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "strings" "time" @@ -24,7 +26,7 @@ type FilerStore interface { Initialize(configuration util.Configuration, prefix string) error InsertEntry(context.Context, *Entry) error UpdateEntry(context.Context, *Entry) (err error) - // err == filer2.ErrNotFound if not found + // err == filer_pb.ErrNotFound if not found FindEntry(context.Context, util.FullPath) (entry *Entry, err error) DeleteEntry(context.Context, util.FullPath) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error) @@ -42,6 +44,11 @@ type FilerStore interface { Shutdown() } +type VirtualFilerStore interface { + FilerStore + DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error +} + type FilerStoreWrapper struct { ActualStore FilerStore } @@ -74,6 +81,32 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err if entry.Mime == "application/octet-stream" { entry.Mime = "" } + + if entry.HardLinkId != 0 { + // check what is existing entry + existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath) + + if err == nil && entry.HardLinkId == existingEntry.HardLinkId { + // updating the same entry + if err := fsw.updateHardLink(ctx, entry); err != nil { + return err + } + return nil + } else { + if err == nil && existingEntry.HardLinkId != 0 { + // break away from the old hard link + if err := fsw.DeleteHardLink(ctx, entry.HardLinkId); err != nil { + return err + } + } + // CreateLink 1.2 : update new file to hardlink mode + // update one existing hard link, counter ++ + if err := fsw.increaseHardLink(ctx, entry.HardLinkId); err != nil { + return err + } + } + } + return fsw.ActualStore.InsertEntry(ctx, entry) } @@ -88,6 +121,53 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err if entry.Mime == "application/octet-stream" { entry.Mime = "" } + + if entry.HardLinkId != 0 { + // handle hard link + + // check what is existing entry + existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath) + if err != nil { + return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err) + } + + err = fsw.maybeReadHardLink(ctx, &Entry{HardLinkId: entry.HardLinkId}) + if err == ErrKvNotFound { + + // CreateLink 1.1 : split source entry into hardlink+empty_entry + + // create hard link from existing entry, counter ++ + existingEntry.HardLinkId = entry.HardLinkId + if err = fsw.createHardLink(ctx, existingEntry); err != nil { + return fmt.Errorf("createHardLink %d: %v", existingEntry.HardLinkId, err) + } + + // create the empty entry + if err = fsw.ActualStore.UpdateEntry(ctx, &Entry{ + FullPath: entry.FullPath, + HardLinkId: entry.HardLinkId, + }); err != nil { + return fmt.Errorf("UpdateEntry to link %d: %v", entry.FullPath, err) + } + return nil + } + if err != nil { + return fmt.Errorf("update entry %s: %v", entry.FullPath, err) + } + + if entry.HardLinkId != existingEntry.HardLinkId { + // if different hard link id, moving to a new hard link + glog.Fatalf("unexpected. update entry to a new link. not implemented yet.") + } else { + // updating hardlink with new metadata + if err = fsw.updateHardLink(ctx, entry); err != nil { + return fmt.Errorf("updateHardLink %d from %s: %v", entry.HardLinkId, entry.FullPath, err) + } + } + + return nil + } + return fsw.ActualStore.UpdateEntry(ctx, entry) } @@ -102,6 +182,9 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) ( if err != nil { return nil, err } + + fsw.maybeReadHardLink(ctx, entry) + filer_pb.AfterEntryDeserialization(entry.Chunks) return } @@ -113,6 +196,17 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) }() + existingEntry, findErr := fsw.FindEntry(ctx, fp) + if findErr == filer_pb.ErrNotFound { + return nil + } + if existingEntry.HardLinkId != 0 { + // remove hard link + if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { + return err + } + } + return fsw.ActualStore.DeleteEntry(ctx, fp) } @@ -138,6 +232,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath return nil, err } for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) } return entries, err @@ -157,6 +252,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, return nil, err } for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) } return entries, nil @@ -222,3 +318,131 @@ func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []by func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { return fsw.ActualStore.KvDelete(ctx, key) } + +func (fsw *FilerStoreWrapper) createHardLink(ctx context.Context, entry *Entry) error { + if entry.HardLinkId == 0 { + return nil + } + key := entry.HardLinkId.Key() + + _, err := fsw.KvGet(ctx, key) + if err != ErrKvNotFound { + return fmt.Errorf("create hardlink %d: already exists: %v", entry.HardLinkId, err) + } + + entry.HardLinkCounter = 1 + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) +} + +func (fsw *FilerStoreWrapper) updateHardLink(ctx context.Context, entry *Entry) error { + if entry.HardLinkId == 0 { + return nil + } + key := entry.HardLinkId.Key() + + value, err := fsw.KvGet(ctx, key) + if err == ErrKvNotFound { + return fmt.Errorf("update hardlink %d: missing", entry.HardLinkId) + } + if err != nil { + return fmt.Errorf("update hardlink %d err: %v", entry.HardLinkId, err) + } + + existingEntry := &Entry{} + if err = existingEntry.DecodeAttributesAndChunks(value); err != nil { + return err + } + + entry.HardLinkCounter = existingEntry.HardLinkCounter + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) +} + +func (fsw *FilerStoreWrapper) increaseHardLink(ctx context.Context, hardLinkId HardLinkId) error { + if hardLinkId == 0 { + return nil + } + key := hardLinkId.Key() + + value, err := fsw.KvGet(ctx, key) + if err == ErrKvNotFound { + return fmt.Errorf("increaseHardLink %d: missing", hardLinkId) + } + if err != nil { + return fmt.Errorf("increaseHardLink %d err: %v", hardLinkId, err) + } + + existingEntry := &Entry{} + if err = existingEntry.DecodeAttributesAndChunks(value); err != nil { + return err + } + + existingEntry.HardLinkCounter++ + + newBlob, encodeErr := existingEntry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) +} + +func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error { + if entry.HardLinkId == 0 { + return nil + } + key := entry.HardLinkId.Key() + + value, err := fsw.KvGet(ctx, key) + if err != nil { + glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) + return err + } + + if err = entry.DecodeAttributesAndChunks(value); err != nil { + glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) + return err + } + + return nil +} + +func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error { + key := hardLinkId.Key() + value, err := fsw.KvGet(ctx, key) + if err == ErrKvNotFound { + return nil + } + if err != nil { + return err + } + + entry := &Entry{} + if err = entry.DecodeAttributesAndChunks(value); err != nil { + return err + } + + entry.HardLinkCounter-- + if entry.HardLinkCounter <= 0 { + return fsw.KvDelete(ctx, key) + } + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) + +} |
