aboutsummaryrefslogtreecommitdiff
path: root/weed/filer/filer_notify_append.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filer/filer_notify_append.go')
-rw-r--r--weed/filer/filer_notify_append.go73
1 files changed, 73 insertions, 0 deletions
diff --git a/weed/filer/filer_notify_append.go b/weed/filer/filer_notify_append.go
new file mode 100644
index 000000000..b1836b046
--- /dev/null
+++ b/weed/filer/filer_notify_append.go
@@ -0,0 +1,73 @@
+package filer
+
+import (
+ "context"
+ "fmt"
+ "os"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+func (f *Filer) appendToFile(targetFile string, data []byte) error {
+
+ assignResult, uploadResult, err2 := f.assignAndUpload(data)
+ if err2 != nil {
+ return err2
+ }
+
+ // find out existing entry
+ fullpath := util.FullPath(targetFile)
+ entry, err := f.FindEntry(context.Background(), fullpath)
+ var offset int64 = 0
+ if err == filer_pb.ErrNotFound {
+ entry = &Entry{
+ FullPath: fullpath,
+ Attr: Attr{
+ Crtime: time.Now(),
+ Mtime: time.Now(),
+ Mode: os.FileMode(0644),
+ Uid: OS_UID,
+ Gid: OS_GID,
+ },
+ }
+ } else {
+ offset = int64(TotalSize(entry.Chunks))
+ }
+
+ // append to existing chunks
+ entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(assignResult.Fid, offset))
+
+ // update the entry
+ err = f.CreateEntry(context.Background(), entry, false, false, nil)
+
+ return err
+}
+
+func (f *Filer) assignAndUpload(data []byte) (*operation.AssignResult, *operation.UploadResult, error) {
+ // assign a volume location
+ assignRequest := &operation.VolumeAssignRequest{
+ Count: 1,
+ Collection: f.metaLogCollection,
+ Replication: f.metaLogReplication,
+ WritableVolumeCount: 1,
+ }
+ assignResult, err := operation.Assign(f.GetMaster(), f.GrpcDialOption, assignRequest)
+ if err != nil {
+ return nil, nil, fmt.Errorf("AssignVolume: %v", err)
+ }
+ if assignResult.Error != "" {
+ return nil, nil, fmt.Errorf("AssignVolume error: %v", assignResult.Error)
+ }
+
+ // upload data
+ targetUrl := "http://" + assignResult.Url + "/" + assignResult.Fid
+ uploadResult, err := operation.UploadData(targetUrl, "", f.Cipher, data, false, "", nil, assignResult.Auth)
+ if err != nil {
+ return nil, nil, fmt.Errorf("upload data %s: %v", targetUrl, err)
+ }
+ // println("uploaded to", targetUrl)
+ return assignResult, uploadResult, nil
+}