diff options
Diffstat (limited to 'weed/mq')
| -rw-r--r-- | weed/mq/broker/broker_write.go | 5 | ||||
| -rw-r--r-- | weed/mq/logstore/log_to_parquet.go | 11 | ||||
| -rw-r--r-- | weed/mq/logstore/read_log_from_disk.go | 3 | ||||
| -rw-r--r-- | weed/mq/logstore/read_parquet_to_log.go | 3 |
4 files changed, 13 insertions, 9 deletions
diff --git a/weed/mq/broker/broker_write.go b/weed/mq/broker/broker_write.go index 9999529fb..9f3c7b50f 100644 --- a/weed/mq/broker/broker_write.go +++ b/weed/mq/broker/broker_write.go @@ -1,6 +1,7 @@ package broker import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/operation" @@ -20,7 +21,7 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error // find out existing entry fullpath := util.FullPath(targetFile) dir, name := fullpath.DirAndName() - entry, err := filer_pb.GetEntry(b, fullpath) + entry, err := filer_pb.GetEntry(context.Background(), b, fullpath) var offset int64 = 0 if err == filer_pb.ErrNotFound { entry = &filer_pb.Entry{ @@ -45,7 +46,7 @@ func (b *MessageQueueBroker) appendToFile(targetFile string, data []byte) error // update the entry return b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + return filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{ Directory: dir, Entry: entry, }) diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go index 30cad8cc1..b2ee2ae00 100644 --- a/weed/mq/logstore/log_to_parquet.go +++ b/weed/mq/logstore/log_to_parquet.go @@ -1,6 +1,7 @@ package logstore import ( + "context" "encoding/binary" "fmt" "github.com/parquet-go/parquet-go" @@ -50,7 +51,7 @@ func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, tim } func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) { - err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error { + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error { t, err := topic.ParseTopicVersion(entry.Name) if err != nil { // skip non-partition directories @@ -66,7 +67,7 @@ func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeA func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) { version := topicVersion.Format(topic.PartitionGenerationFormat) - err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error { + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error { if !entry.IsDirectory { return nil } @@ -151,7 +152,7 @@ func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGr } func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) { - err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error { + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error { if strings.HasSuffix(entry.Name, ".parquet") { return nil } @@ -173,7 +174,7 @@ func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, time } func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) { - err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error { + err = filer_pb.ReadDirAllEntries(context.Background(), filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error { if !strings.HasSuffix(entry.Name, ".parquet") { return nil } @@ -354,7 +355,7 @@ func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile // write the entry to partitionDir if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{ + return filer_pb.CreateEntry(context.Background(), client, &filer_pb.CreateEntryRequest{ Directory: partitionDir, Entry: entry, }) diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go index 71ba58c1f..2c8a7c1de 100644 --- a/weed/mq/logstore/read_log_from_disk.go +++ b/weed/mq/logstore/read_log_from_disk.go @@ -1,6 +1,7 @@ package logstore import ( + "context" "fmt" "github.com/seaweedfs/seaweedfs/weed/filer" "github.com/seaweedfs/seaweedfs/weed/glog" @@ -113,7 +114,7 @@ func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p top stopTime := time.Unix(0, stopTsNs) var processedTsNs int64 err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { + return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory { return nil } diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go index 4811d6c80..6c69b9f12 100644 --- a/weed/mq/logstore/read_parquet_to_log.go +++ b/weed/mq/logstore/read_parquet_to_log.go @@ -1,6 +1,7 @@ package logstore import ( + "context" "encoding/binary" "fmt" "github.com/parquet-go/parquet-go" @@ -115,7 +116,7 @@ func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error { - return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { + return filer_pb.SeaweedList(context.Background(), client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error { if entry.IsDirectory { return nil } |
