aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-09-24 09:43:00 -0700
committerChris Lu <chris.lu@gmail.com>2020-09-24 09:43:00 -0700
commit1295347958e0954a5e1f053cd7ab52576fb8151e (patch)
treed6a441d3b49be7e4fdb76e08ec66f78487004741
parent5e239afdfc64ef39c5d4f41ec16410e726614eee (diff)
downloadseaweedfs-1295347958e0954a5e1f053cd7ab52576fb8151e.tar.xz
seaweedfs-1295347958e0954a5e1f053cd7ab52576fb8151e.zip
adjust hardlink update
simplify logic, pass entity content directly to hard link. The "weed mount" handles the logic to calculate hard link counter.
-rw-r--r--weed/filer/filerstore.go201
-rw-r--r--weed/filer/filerstore_hardlink.go95
-rw-r--r--weed/filesys/dir_link.go46
3 files changed, 119 insertions, 223 deletions
diff --git a/weed/filer/filerstore.go b/weed/filer/filerstore.go
index 7dc778562..888168581 100644
--- a/weed/filer/filerstore.go
+++ b/weed/filer/filerstore.go
@@ -3,8 +3,6 @@ package filer
import (
"context"
"errors"
- "fmt"
- "github.com/chrislusf/seaweedfs/weed/glog"
"strings"
"time"
@@ -82,29 +80,8 @@ func (fsw *FilerStoreWrapper) InsertEntry(ctx context.Context, entry *Entry) err
entry.Mime = ""
}
- if entry.HardLinkId != 0 {
- // check what is existing entry
- existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
-
- if err == nil && entry.HardLinkId == existingEntry.HardLinkId {
- // updating the same entry
- if err := fsw.updateHardLink(ctx, entry); err != nil {
- return err
- }
- return nil
- } else {
- if err == nil && existingEntry.HardLinkId != 0 {
- // break away from the old hard link
- if err := fsw.DeleteHardLink(ctx, entry.HardLinkId); err != nil {
- return err
- }
- }
- // CreateLink 1.2 : update new file to hardlink mode
- // update one existing hard link, counter ++
- if err := fsw.increaseHardLink(ctx, entry.HardLinkId); err != nil {
- return err
- }
- }
+ if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
+ return err
}
return fsw.ActualStore.InsertEntry(ctx, entry)
@@ -122,50 +99,8 @@ func (fsw *FilerStoreWrapper) UpdateEntry(ctx context.Context, entry *Entry) err
entry.Mime = ""
}
- if entry.HardLinkId != 0 {
- // handle hard link
-
- // check what is existing entry
- existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
- if err != nil {
- return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
- }
-
- err = fsw.maybeReadHardLink(ctx, &Entry{HardLinkId: entry.HardLinkId})
- if err == ErrKvNotFound {
-
- // CreateLink 1.1 : split source entry into hardlink+empty_entry
-
- // create hard link from existing entry, counter ++
- existingEntry.HardLinkId = entry.HardLinkId
- if err = fsw.createHardLink(ctx, existingEntry); err != nil {
- return fmt.Errorf("createHardLink %d: %v", existingEntry.HardLinkId, err)
- }
-
- // create the empty entry
- if err = fsw.ActualStore.UpdateEntry(ctx, &Entry{
- FullPath: entry.FullPath,
- HardLinkId: entry.HardLinkId,
- }); err != nil {
- return fmt.Errorf("UpdateEntry to link %d: %v", entry.FullPath, err)
- }
- return nil
- }
- if err != nil {
- return fmt.Errorf("update entry %s: %v", entry.FullPath, err)
- }
-
- if entry.HardLinkId != existingEntry.HardLinkId {
- // if different hard link id, moving to a new hard link
- glog.Fatalf("unexpected. update entry to a new link. not implemented yet.")
- } else {
- // updating hardlink with new metadata
- if err = fsw.updateHardLink(ctx, entry); err != nil {
- return fmt.Errorf("updateHardLink %d from %s: %v", entry.HardLinkId, entry.FullPath, err)
- }
- }
-
- return nil
+ if err := fsw.handleUpdateToHardLinks(ctx, entry); err != nil {
+ return err
}
return fsw.ActualStore.UpdateEntry(ctx, entry)
@@ -318,131 +253,3 @@ func (fsw *FilerStoreWrapper) KvGet(ctx context.Context, key []byte) (value []by
func (fsw *FilerStoreWrapper) KvDelete(ctx context.Context, key []byte) (err error) {
return fsw.ActualStore.KvDelete(ctx, key)
}
-
-func (fsw *FilerStoreWrapper) createHardLink(ctx context.Context, entry *Entry) error {
- if entry.HardLinkId == 0 {
- return nil
- }
- key := entry.HardLinkId.Key()
-
- _, err := fsw.KvGet(ctx, key)
- if err != ErrKvNotFound {
- return fmt.Errorf("create hardlink %d: already exists: %v", entry.HardLinkId, err)
- }
-
- entry.HardLinkCounter = 1
-
- newBlob, encodeErr := entry.EncodeAttributesAndChunks()
- if encodeErr != nil {
- return encodeErr
- }
-
- return fsw.KvPut(ctx, key, newBlob)
-}
-
-func (fsw *FilerStoreWrapper) updateHardLink(ctx context.Context, entry *Entry) error {
- if entry.HardLinkId == 0 {
- return nil
- }
- key := entry.HardLinkId.Key()
-
- value, err := fsw.KvGet(ctx, key)
- if err == ErrKvNotFound {
- return fmt.Errorf("update hardlink %d: missing", entry.HardLinkId)
- }
- if err != nil {
- return fmt.Errorf("update hardlink %d err: %v", entry.HardLinkId, err)
- }
-
- existingEntry := &Entry{}
- if err = existingEntry.DecodeAttributesAndChunks(value); err != nil {
- return err
- }
-
- entry.HardLinkCounter = existingEntry.HardLinkCounter
-
- newBlob, encodeErr := entry.EncodeAttributesAndChunks()
- if encodeErr != nil {
- return encodeErr
- }
-
- return fsw.KvPut(ctx, key, newBlob)
-}
-
-func (fsw *FilerStoreWrapper) increaseHardLink(ctx context.Context, hardLinkId HardLinkId) error {
- if hardLinkId == 0 {
- return nil
- }
- key := hardLinkId.Key()
-
- value, err := fsw.KvGet(ctx, key)
- if err == ErrKvNotFound {
- return fmt.Errorf("increaseHardLink %d: missing", hardLinkId)
- }
- if err != nil {
- return fmt.Errorf("increaseHardLink %d err: %v", hardLinkId, err)
- }
-
- existingEntry := &Entry{}
- if err = existingEntry.DecodeAttributesAndChunks(value); err != nil {
- return err
- }
-
- existingEntry.HardLinkCounter++
-
- newBlob, encodeErr := existingEntry.EncodeAttributesAndChunks()
- if encodeErr != nil {
- return encodeErr
- }
-
- return fsw.KvPut(ctx, key, newBlob)
-}
-
-func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error {
- if entry.HardLinkId == 0 {
- return nil
- }
- key := entry.HardLinkId.Key()
-
- value, err := fsw.KvGet(ctx, key)
- if err != nil {
- glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
- return err
- }
-
- if err = entry.DecodeAttributesAndChunks(value); err != nil {
- glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
- return err
- }
-
- return nil
-}
-
-func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error {
- key := hardLinkId.Key()
- value, err := fsw.KvGet(ctx, key)
- if err == ErrKvNotFound {
- return nil
- }
- if err != nil {
- return err
- }
-
- entry := &Entry{}
- if err = entry.DecodeAttributesAndChunks(value); err != nil {
- return err
- }
-
- entry.HardLinkCounter--
- if entry.HardLinkCounter <= 0 {
- return fsw.KvDelete(ctx, key)
- }
-
- newBlob, encodeErr := entry.EncodeAttributesAndChunks()
- if encodeErr != nil {
- return encodeErr
- }
-
- return fsw.KvPut(ctx, key, newBlob)
-
-}
diff --git a/weed/filer/filerstore_hardlink.go b/weed/filer/filerstore_hardlink.go
new file mode 100644
index 000000000..ec768b9cb
--- /dev/null
+++ b/weed/filer/filerstore_hardlink.go
@@ -0,0 +1,95 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+)
+
+func (fsw *FilerStoreWrapper) handleUpdateToHardLinks(ctx context.Context, entry *Entry) error {
+ if entry.HardLinkId == 0 {
+ return nil
+ }
+ // handle hard links
+ if err := fsw.setHardLink(ctx, entry); err != nil {
+ return fmt.Errorf("setHardLink %d: %v", entry.HardLinkId, err)
+ }
+
+ // check what is existing entry
+ existingEntry, err := fsw.ActualStore.FindEntry(ctx, entry.FullPath)
+ if err != nil && err != filer_pb.ErrNotFound {
+ return fmt.Errorf("update existing entry %s: %v", entry.FullPath, err)
+ }
+
+ // remove old hard link
+ if err == nil && existingEntry.HardLinkId != entry.HardLinkId {
+ if err = fsw.DeleteHardLink(ctx, existingEntry.HardLinkId); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (fsw *FilerStoreWrapper) setHardLink(ctx context.Context, entry *Entry) error {
+ if entry.HardLinkId == 0 {
+ return nil
+ }
+ key := entry.HardLinkId.Key()
+
+ newBlob, encodeErr := entry.EncodeAttributesAndChunks()
+ if encodeErr != nil {
+ return encodeErr
+ }
+
+ return fsw.KvPut(ctx, key, newBlob)
+}
+
+func (fsw *FilerStoreWrapper) maybeReadHardLink(ctx context.Context, entry *Entry) error {
+ if entry.HardLinkId == 0 {
+ return nil
+ }
+ key := entry.HardLinkId.Key()
+
+ value, err := fsw.KvGet(ctx, key)
+ if err != nil {
+ glog.Errorf("read %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
+ return err
+ }
+
+ if err = entry.DecodeAttributesAndChunks(value); err != nil {
+ glog.Errorf("decode %s hardlink %d: %v", entry.FullPath, entry.HardLinkId, err)
+ return err
+ }
+
+ return nil
+}
+
+func (fsw *FilerStoreWrapper) DeleteHardLink(ctx context.Context, hardLinkId HardLinkId) error {
+ key := hardLinkId.Key()
+ value, err := fsw.KvGet(ctx, key)
+ if err == ErrKvNotFound {
+ return nil
+ }
+ if err != nil {
+ return err
+ }
+
+ entry := &Entry{}
+ if err = entry.DecodeAttributesAndChunks(value); err != nil {
+ return err
+ }
+
+ entry.HardLinkCounter--
+ if entry.HardLinkCounter <= 0 {
+ return fsw.KvDelete(ctx, key)
+ }
+
+ newBlob, encodeErr := entry.EncodeAttributesAndChunks()
+ if encodeErr != nil {
+ return encodeErr
+ }
+
+ return fsw.KvPut(ctx, key, newBlob)
+
+}
diff --git a/weed/filesys/dir_link.go b/weed/filesys/dir_link.go
index c15aed863..ddc3248bd 100644
--- a/weed/filesys/dir_link.go
+++ b/weed/filesys/dir_link.go
@@ -32,31 +32,28 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
}
// update old file to hardlink mode
- var updateOldEntryRequest *filer_pb.UpdateEntryRequest
- var hardLinkId filer.HardLinkId
- if oldFile.entry.HardLinkId != 0 {
- hardLinkId = filer.HardLinkId(oldFile.entry.HardLinkId)
- } else {
- // CreateLink 1.1 : split source entry into hardlink+empty_entry
- hardLinkId = filer.HardLinkId(util.RandomInt64())
- updateOldEntryRequest = &filer_pb.UpdateEntryRequest{
- Directory: oldFile.dir.FullPath(),
- Entry: &filer_pb.Entry{
- Name: oldFile.entry.Name,
- IsDirectory: oldFile.entry.IsDirectory,
- HardLinkId: int64(hardLinkId),
- },
- Signatures: []int32{dir.wfs.signature},
- }
+ if oldFile.entry.HardLinkId == 0 {
+ oldFile.entry.HardLinkId = util.RandomInt64()
+ oldFile.entry.HardLinkCounter = 1
+ }
+ oldFile.entry.HardLinkCounter++
+ updateOldEntryRequest := &filer_pb.UpdateEntryRequest{
+ Directory: oldFile.dir.FullPath(),
+ Entry: oldFile.entry,
+ Signatures: []int32{dir.wfs.signature},
}
// CreateLink 1.2 : update new file to hardlink mode
request := &filer_pb.CreateEntryRequest{
Directory: dir.FullPath(),
Entry: &filer_pb.Entry{
- Name: req.NewName,
- IsDirectory: false,
- HardLinkId: int64(hardLinkId),
+ Name: req.NewName,
+ IsDirectory: false,
+ Attributes: oldFile.entry.Attributes,
+ Chunks: oldFile.entry.Chunks,
+ Extended: oldFile.entry.Extended,
+ HardLinkId: oldFile.entry.HardLinkId,
+ HardLinkCounter: oldFile.entry.HardLinkCounter,
},
Signatures: []int32{dir.wfs.signature},
}
@@ -67,14 +64,11 @@ func (dir *Dir) Link(ctx context.Context, req *fuse.LinkRequest, old fs.Node) (f
dir.wfs.mapPbIdFromLocalToFiler(request.Entry)
defer dir.wfs.mapPbIdFromFilerToLocal(request.Entry)
- if updateOldEntryRequest != nil {
- if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil {
- glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
- return fuse.EIO
- }
- dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry))
- oldFile.entry.HardLinkId = int64(hardLinkId)
+ if err := filer_pb.UpdateEntry(client, updateOldEntryRequest); err != nil {
+ glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)
+ return fuse.EIO
}
+ dir.wfs.metaCache.UpdateEntry(context.Background(), filer.FromPbEntry(updateOldEntryRequest.Directory, updateOldEntryRequest.Entry))
if err := filer_pb.CreateEntry(client, request); err != nil {
glog.V(0).Infof("Link %v/%v -> %s/%s: %v", oldFile.dir.FullPath(), oldFile.Name, dir.FullPath(), req.NewName, err)