diff options
Diffstat (limited to 'weed/filer/filer_notify_append.go')
| -rw-r--r-- | weed/filer/filer_notify_append.go | 73 |
1 files changed, 73 insertions, 0 deletions
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go new file mode 100644 index 000000000..b1836b046 --- /dev/null +++ b/weed/filer/filer_notify_append.go @@ -0,0 +1,73 @@ +package filer + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/chrislusf/seaweedfs/weed/operation" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" + "github.com/chrislusf/seaweedfs/weed/util" +) + +func (f *Filer) appendToFile(targetFile string, data []byte) error { + + assignResult, uploadResult, err2 := f.assignAndUpload(data) + if err2 != nil { + return err2 + } + + // find out existing entry + fullpath := util.FullPath(targetFile) + entry, err := f.FindEntry(context.Background(), fullpath) + var offset int64 = 0 + if err == filer_pb.ErrNotFound { + entry = &Entry{ + FullPath: fullpath, + Attr: Attr{ + Crtime: time.Now(), + Mtime: time.Now(), + Mode: os.FileMode(0644), + Uid: OS_UID, + Gid: OS_GID, + }, + } + } else { + offset = int64(TotalSize(entry.Chunks)) + } + + // append to existing chunks + entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset)) + + // update the entry + err = f.CreateEntry(context.Background(), entry, false, false, nil) + + return err +} + +func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) { + // assign a volume location + assignRequest := &operation.VolumeAssignRequest{ + Count: 1, + Collection: f.metaLogCollection, + Replication: f.metaLogReplication, + WritableVolumeCount: 1, + } + assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest) + if err != nil { + return nil, nil, fmt.Errorf("AssignVolume: %v", err) + } + if assignResult.Error != "" { + return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error) + } + + // upload data + targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid + uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth) + if err != nil { + return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err) + } + // println("uploaded to", targetUrl) + return assignResult, uploadResult, nil +} |
