diff options
Diffstat (limited to 'weed/filer/filerstore.go')
| -rw-r--r-- | weed/filer/filerstore.go | 226 |
1 files changed, 225 insertions, 1 deletions
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go index d313b7ba3..7dc778562 100644 --- a/weed/filer/filerstore.go +++ b/weed/filer/filerstore.go @@ -3,6 +3,8 @@ package filer import ( "context" "errors" + "fmt" + "github.com/chrislusf/seaweedfs/weed/glog" "strings" "time" @@ -24,7 +26,7 @@ type FilerStore interface { Initialize(configuration util.Configuration, prefix string) error InsertEntry(context.Context, *Entry) error UpdateEntry(context.Context, *Entry) (err error) - // err == filer2.ErrNotFound if not found + // err == filer_pb.ErrNotFound if not found FindEntry(context.Context, util.FullPath) (entry *Entry, err error) DeleteEntry(context.Context, util.FullPath) (err error) DeleteFolderChildren(context.Context, util.FullPath) (err error) @@ -42,6 +44,11 @@ type FilerStore interface { Shutdown() } +type VirtualFilerStore interface { + FilerStore + DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error +} + type FilerStoreWrapper struct { ActualStore FilerStore } @@ -74,6 +81,32 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err if entry.Mime == "application/octet-stream" { entry.Mime = "" } + + if entry.HardLinkId != 0 { + // check what is existing entry + existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath) + + if err == nil && entry.HardLinkId == existingEntry.HardLinkId { + // updating the same entry + if err := fsw.updateHardLink(ctx, entry); err != nil { + return err + } + return nil + } else { + if err == nil && existingEntry.HardLinkId != 0 { + // break away from the old hard link + if err := fsw.DeleteHardLink(ctx, entry.HardLinkId); err != nil { + return err + } + } + // CreateLink 1.2 : update new file to hardlink mode + // update one existing hard link, counter ++ + if err := fsw.increaseHardLink(ctx, entry.HardLinkId); err != nil { + return err + } + } + } + return fsw.ActualStore.InsertEntry(ctx, entry) } @@ -88,6 +121,53 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err if entry.Mime == "application/octet-stream" { entry.Mime = "" } + + if entry.HardLinkId != 0 { + // handle hard link + + // check what is existing entry + existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath) + if err != nil { + return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err) + } + + err = fsw.maybeReadHardLink(ctx, &Entry{HardLinkId: entry.HardLinkId}) + if err == ErrKvNotFound { + + // CreateLink 1.1 : split source entry into hardlink+empty_entry + + // create hard link from existing entry, counter ++ + existingEntry.HardLinkId = entry.HardLinkId + if err = fsw.createHardLink(ctx, existingEntry); err != nil { + return fmt.Errorf("createHardLink %d: %v", existingEntry.HardLinkId, err) + } + + // create the empty entry + if err = fsw.ActualStore.UpdateEntry(ctx, &Entry{ + FullPath: entry.FullPath, + HardLinkId: entry.HardLinkId, + }); err != nil { + return fmt.Errorf("UpdateEntry to link %d: %v", entry.FullPath, err) + } + return nil + } + if err != nil { + return fmt.Errorf("update entry %s: %v", entry.FullPath, err) + } + + if entry.HardLinkId != existingEntry.HardLinkId { + // if different hard link id, moving to a new hard link + glog.Fatalf("unexpected. update entry to a new link. not implemented yet.") + } else { + // updating hardlink with new metadata + if err = fsw.updateHardLink(ctx, entry); err != nil { + return fmt.Errorf("updateHardLink %d from %s: %v", entry.HardLinkId, entry.FullPath, err) + } + } + + return nil + } + return fsw.ActualStore.UpdateEntry(ctx, entry) } @@ -102,6 +182,9 @@ func (fsw *FilerStoreWrapper) FindEntry(ctx context.Context, fp util.FullPath) ( if err != nil { return nil, err } + + fsw.maybeReadHardLink(ctx, entry) + filer_pb.AfterEntryDeserialization(entry.Chunks) return } @@ -113,6 +196,17 @@ func (fsw *FilerStoreWrapper) DeleteEntry(ctx context.Context, fp util.FullPath) stats.FilerStoreHistogram.WithLabelValues(fsw.ActualStore.GetName(), "delete").Observe(time.Since(start).Seconds()) }() + existingEntry, findErr := fsw.FindEntry(ctx, fp) + if findErr == filer_pb.ErrNotFound { + return nil + } + if existingEntry.HardLinkId != 0 { + // remove hard link + if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil { + return err + } + } + return fsw.ActualStore.DeleteEntry(ctx, fp) } @@ -138,6 +232,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryEntries(ctx context.Context, dirPath return nil, err } for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) } return entries, err @@ -157,6 +252,7 @@ func (fsw *FilerStoreWrapper) ListDirectoryPrefixedEntries(ctx context.Context, return nil, err } for _, entry := range entries { + fsw.maybeReadHardLink(ctx, entry) filer_pb.AfterEntryDeserialization(entry.Chunks) } return entries, nil @@ -222,3 +318,131 @@ func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []by func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) { return fsw.ActualStore.KvDelete(ctx, key) } + +func (fsw *FilerStoreWrapper) createHardLink(ctx context.Context, entry *Entry) error { + if entry.HardLinkId == 0 { + return nil + } + key := entry.HardLinkId.Key() + + _, err := fsw.KvGet(ctx, key) + if err != ErrKvNotFound { + return fmt.Errorf("create hardlink %d: already exists: %v", entry.HardLinkId, err) + } + + entry.HardLinkCounter = 1 + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) +} + +func (fsw *FilerStoreWrapper) updateHardLink(ctx context.Context, entry *Entry) error { + if entry.HardLinkId == 0 { + return nil + } + key := entry.HardLinkId.Key() + + value, err := fsw.KvGet(ctx, key) + if err == ErrKvNotFound { + return fmt.Errorf("update hardlink %d: missing", entry.HardLinkId) + } + if err != nil { + return fmt.Errorf("update hardlink %d err: %v", entry.HardLinkId, err) + } + + existingEntry := &Entry{} + if err = existingEntry.DecodeAttributesAndChunks(value); err != nil { + return err + } + + entry.HardLinkCounter = existingEntry.HardLinkCounter + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) +} + +func (fsw *FilerStoreWrapper) increaseHardLink(ctx context.Context, hardLinkId HardLinkId) error { + if hardLinkId == 0 { + return nil + } + key := hardLinkId.Key() + + value, err := fsw.KvGet(ctx, key) + if err == ErrKvNotFound { + return fmt.Errorf("increaseHardLink %d: missing", hardLinkId) + } + if err != nil { + return fmt.Errorf("increaseHardLink %d err: %v", hardLinkId, err) + } + + existingEntry := &Entry{} + if err = existingEntry.DecodeAttributesAndChunks(value); err != nil { + return err + } + + existingEntry.HardLinkCounter++ + + newBlob, encodeErr := existingEntry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) +} + +func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error { + if entry.HardLinkId == 0 { + return nil + } + key := entry.HardLinkId.Key() + + value, err := fsw.KvGet(ctx, key) + if err != nil { + glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) + return err + } + + if err = entry.DecodeAttributesAndChunks(value); err != nil { + glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err) + return err + } + + return nil +} + +func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error { + key := hardLinkId.Key() + value, err := fsw.KvGet(ctx, key) + if err == ErrKvNotFound { + return nil + } + if err != nil { + return err + } + + entry := &Entry{} + if err = entry.DecodeAttributesAndChunks(value); err != nil { + return err + } + + entry.HardLinkCounter-- + if entry.HardLinkCounter <= 0 { + return fsw.KvDelete(ctx, key) + } + + newBlob, encodeErr := entry.EncodeAttributesAndChunks() + if encodeErr != nil { + return encodeErr + } + + return fsw.KvPut(ctx, key, newBlob) + +} |
