diff options
Diffstat (limited to 'weed/mq/broker/broker_write.go')
| -rw-r--r-- | weed/mq/broker/broker_write.go | 82 |
1 files changed, 82 insertions, 0 deletions
diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go new file mode 100644 index 000000000..866cd17c2 --- /dev/null +++ b/weed/mq/broker/broker_write.go @@ -0,0 +1,82 @@ +package broker + +import ( + "fmt" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/operation" + "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + "github.com/seaweedfs/seaweedfs/weed/util" + "os" + "time" +) + +func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error { + + fileId, uploadResult, err2 := b.assignAndUpload(targetFile, data) + if err2 != nil { + return err2 + } + + // find out existing entry + fullpath := util.FullPath(targetFile) + dir, name := fullpath.DirAndName() + entry, err := filer_pb.GetEntry(b, fullpath) + var offset int64 = 0 + if err == filer_pb.ErrNotFound { + entry = &filer_pb.Entry{ + Name: name, + IsDirectory: false, + Attributes: &filer_pb.FuseAttributes{ + Crtime: time.Now().Unix(), + Mtime: time.Now().Unix(), + FileMode: uint32(os.FileMode(0644)), + Uid: uint32(os.Getuid()), + Gid: uint32(os.Getgid()), + }, + } + } else if err != nil { + return fmt.Errorf("find %s: %v", fullpath, err) + } else { + offset = int64(filer.TotalSize(entry.GetChunks())) + } + + // append to existing chunks + entry.Chunks = append(entry.GetChunks(), uploadResult.ToPbFileChunk(fileId, offset, time.Now().UnixNano())) + + // update the entry + return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { + return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + Directory: dir, + Entry: entry, + }) + }) +} + +func (b *MessageQueueBroker) assignAndUpload(targetFile string, data []byte) (fileId string, uploadResult *operation.UploadResult, err error) { + + reader := util.NewBytesReader(data) + fileId, uploadResult, err, _ = operation.UploadWithRetry( + b, + &filer_pb.AssignVolumeRequest{ + Count: 1, + Replication: b.option.DefaultReplication, + Collection: "topics", + // TtlSec: wfs.option.TtlSec, + // DiskType: string(wfs.option.DiskType), + DataCenter: b.option.DataCenter, + Path: targetFile, + }, + &operation.UploadOption{ + Cipher: b.option.Cipher, + }, + func(host, fileId string) string { + fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) + if b.option.VolumeServerAccess == "filerProxy" { + fileUrl = fmt.Sprintf("http://%s/?proxyChunkId=%s", b.currentFiler, fileId) + } + return fileUrl + }, + reader, + ) + return +} |
