aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chrislusf@users.noreply.github.com>2024-11-04 12:08:25 -0800
committerGitHub <noreply@github.com>2024-11-04 12:08:25 -0800
commitdc784bf217172b0e81eb4b3e5eb0e0e38b91849a (patch)
treee0795c9a6335571ca852bd84e720eb42d7b3538f
parentffe908371d4ddbc0cacbe8362a28bf56b322b349 (diff)
downloadseaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.tar.xz
seaweedfs-dc784bf217172b0e81eb4b3e5eb0e0e38b91849a.zip
merge current message queue code changes (#6201)
* listing files to convert to parquet * write parquet files * save logs into parquet files * pass by value * compact logs into parquet format * can skip existing files * refactor * refactor * fix compilation * when no partition found * refactor * add untested parquet file read * rename package * refactor * rename files * remove unused * add merged log read func * parquet wants to know the file size * rewind by time * pass in stop ts * add stop ts * adjust log * minor * adjust log * skip .parquet files when reading message logs * skip non message files * Update subscriber_record.go * send messages * skip message data with only ts * skip non log files * update parquet-go package * ensure a valid record type * add new field to a record type * Update read_parquet_to_log.go * fix parquet file name generation * separating reading parquet and logs * add key field * add skipped logs * use in memory cache * refactor * refactor * refactor * refactor, and change compact log * refactor * rename * refactor * fix format * prefix v to version directory
-rw-r--r--go.mod2
-rw-r--r--go.sum2
-rw-r--r--weed/command/benchmark.go4
-rw-r--r--weed/command/upload.go18
-rw-r--r--weed/filer/reader_at.go4
-rw-r--r--weed/filer/topics.go5
-rw-r--r--weed/filer_client/filer_client_accessor.go36
-rw-r--r--weed/mq/broker/broker_grpc_assign.go3
-rw-r--r--weed/mq/broker/broker_grpc_configure.go2
-rw-r--r--weed/mq/broker/broker_grpc_pub_follow.go5
-rw-r--r--weed/mq/broker/broker_grpc_sub_follow.go9
-rw-r--r--weed/mq/broker/broker_topic_conf_read_write.go5
-rw-r--r--weed/mq/broker/broker_topic_partition_read_write.go137
-rw-r--r--weed/mq/client/cmd/weed_pub_record/publisher_record.go13
-rw-r--r--weed/mq/client/cmd/weed_sub_record/subscriber_record.go15
-rw-r--r--weed/mq/client/sub_client/on_each_partition.go12
-rw-r--r--weed/mq/client/sub_client/subscriber.go1
-rw-r--r--weed/mq/logstore/log_to_parquet.go454
-rw-r--r--weed/mq/logstore/merged_read.go41
-rw-r--r--weed/mq/logstore/read_log_from_disk.go144
-rw-r--r--weed/mq/logstore/read_parquet_to_log.go162
-rw-r--r--weed/mq/pub_balancer/broker_stats.go4
-rw-r--r--weed/mq/schema/schema_builder.go7
-rw-r--r--weed/mq/sub_coordinator/partition_consumer_mapping.go10
-rw-r--r--weed/mq/sub_coordinator/partition_list.go14
-rw-r--r--weed/mq/topic/local_manager.go2
-rw-r--r--weed/mq/topic/local_partition.go1
-rw-r--r--weed/mq/topic/partition.go18
-rw-r--r--weed/mq/topic/topic.go44
-rw-r--r--weed/mq/topic/topic_partition.go2
-rw-r--r--weed/operation/submit.go80
-rw-r--r--weed/shell/command_mq_topic_compact.go94
-rw-r--r--weed/util/chunk_cache/chunk_cache_in_memory.go20
33 files changed, 1106 insertions, 264 deletions
diff --git a/go.mod b/go.mod
index a0301cb96..a270518b6 100644
--- a/go.mod
+++ b/go.mod
@@ -141,7 +141,7 @@ require (
github.com/hashicorp/raft v1.7.1
github.com/hashicorp/raft-boltdb/v2 v2.3.0
github.com/orcaman/concurrent-map/v2 v2.0.1
- github.com/parquet-go/parquet-go v0.23.0
+ github.com/parquet-go/parquet-go v0.23.1-0.20241011155651-6446d1d0d2fe
github.com/rabbitmq/amqp091-go v1.10.0
github.com/rclone/rclone v1.68.1
github.com/rdleal/intervalst v1.4.0
diff --git a/go.sum b/go.sum
index be5f013dd..307e1f370 100644
--- a/go.sum
+++ b/go.sum
@@ -1338,6 +1338,8 @@ github.com/panjf2000/ants/v2 v2.9.1 h1:Q5vh5xohbsZXGcD6hhszzGqB7jSSc2/CRr3QKIga8
github.com/panjf2000/ants/v2 v2.9.1/go.mod h1:7ZxyxsqE4vvW0M7LSD8aI3cKwgFhBHbxnlN8mDqHa1I=
github.com/parquet-go/parquet-go v0.23.0 h1:dyEU5oiHCtbASyItMCD2tXtT2nPmoPbKpqf0+nnGrmk=
github.com/parquet-go/parquet-go v0.23.0/go.mod h1:MnwbUcFHU6uBYMymKAlPPAw9yh3kE1wWl6Gl1uLdkNk=
+github.com/parquet-go/parquet-go v0.23.1-0.20241011155651-6446d1d0d2fe h1:oUJ5TPnrEK/z+/PeoLL+jCgfngAZIDMyhZASetRcYYg=
+github.com/parquet-go/parquet-go v0.23.1-0.20241011155651-6446d1d0d2fe/go.mod h1:OqBBRGBl7+llplCvDMql8dEKaDqjaFA/VAPw+OJiNiw=
github.com/pascaldekloe/goe v0.1.0 h1:cBOtyMzM9HTpWjXfbbunk26uA6nG3a8n06Wieeh0MwY=
github.com/pascaldekloe/goe v0.1.0/go.mod h1:lzWF7FIEvWOWxwDKqyGYQf6ZUaNfKdP144TG7ZOy1lc=
github.com/patrickmn/go-cache v2.1.0+incompatible h1:HRMgzkcYKYpi3C8ajMPV8OFXaaRUnok+kx1WdO15EQc=
diff --git a/weed/command/benchmark.go b/weed/command/benchmark.go
index bc7ee1292..08db2ef3d 100644
--- a/weed/command/benchmark.go
+++ b/weed/command/benchmark.go
@@ -21,8 +21,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
- "github.com/seaweedfs/seaweedfs/weed/wdclient"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "github.com/seaweedfs/seaweedfs/weed/wdclient"
)
type BenchmarkOptions struct {
@@ -242,7 +242,7 @@ func writeFiles(idChan chan int, fileIdLineChan chan string, s *stat) {
DiskType: *b.diskType,
}
if assignResult, err := operation.Assign(b.masterClient.GetMaster, b.grpcDialOption, ar); err == nil {
- fp.Server, fp.Fid, fp.Collection = assignResult.Url, assignResult.Fid, *b.collection
+ fp.Server, fp.Fid, fp.Pref.Collection = assignResult.Url, assignResult.Fid, *b.collection
if !isSecure && assignResult.Auth != "" {
isSecure = true
}
diff --git a/weed/command/upload.go b/weed/command/upload.go
index 7135a707a..9f9ac1107 100644
--- a/weed/command/upload.go
+++ b/weed/command/upload.go
@@ -97,7 +97,14 @@ func runUpload(cmd *Command, args []string) bool {
if e != nil {
return e
}
- results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ results, e := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, operation.StoragePreference{
+ Replication: *upload.replication,
+ Collection: *upload.collection,
+ DataCenter: *upload.dataCenter,
+ Ttl: *upload.ttl,
+ DiskType: *upload.diskType,
+ MaxMB: *upload.maxMB,
+ }, *upload.usePublicUrl)
bytes, _ := json.Marshal(results)
fmt.Println(string(bytes))
if e != nil {
@@ -119,7 +126,14 @@ func runUpload(cmd *Command, args []string) bool {
fmt.Println(e.Error())
return false
}
- results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, *upload.replication, *upload.collection, *upload.dataCenter, *upload.ttl, *upload.diskType, *upload.maxMB, *upload.usePublicUrl)
+ results, err := operation.SubmitFiles(func(_ context.Context) pb.ServerAddress { return pb.ServerAddress(*upload.master) }, grpcDialOption, parts, operation.StoragePreference{
+ Replication: *upload.replication,
+ Collection: *upload.collection,
+ DataCenter: *upload.dataCenter,
+ Ttl: *upload.ttl,
+ DiskType: *upload.diskType,
+ MaxMB: *upload.maxMB,
+ }, *upload.usePublicUrl)
if err != nil {
fmt.Println(err.Error())
return false
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 24162995e..d7617b740 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -97,6 +97,10 @@ func NewChunkReaderAtFromClient(readerCache *ReaderCache, chunkViews *IntervalLi
}
}
+func (c *ChunkReadAt) Size() int64 {
+ return c.fileSize
+}
+
func (c *ChunkReadAt) Close() error {
c.readerCache.destroy()
return nil
diff --git a/weed/filer/topics.go b/weed/filer/topics.go
index 3a2fde8c4..707a4f878 100644
--- a/weed/filer/topics.go
+++ b/weed/filer/topics.go
@@ -1,6 +1,7 @@
package filer
const (
- TopicsDir = "/topics"
- SystemLogDir = TopicsDir + "/.system/log"
+ TopicsDir = "/topics"
+ SystemLogDir = TopicsDir + "/.system/log"
+ TopicConfFile = "topic.conf"
)
diff --git a/weed/filer_client/filer_client_accessor.go b/weed/filer_client/filer_client_accessor.go
index be70f2b82..20646d343 100644
--- a/weed/filer_client/filer_client_accessor.go
+++ b/weed/filer_client/filer_client_accessor.go
@@ -1,16 +1,12 @@
package filer_client
import (
- "bytes"
- "fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"google.golang.org/grpc"
- jsonpb "google.golang.org/protobuf/encoding/protojson"
)
type FilerClientAccessor struct {
@@ -22,41 +18,23 @@ func (fca *FilerClientAccessor) WithFilerClient(streamingMode bool, fn func(file
return pb.WithFilerClient(streamingMode, 0, fca.GetFiler(), fca.GetGrpcDialOption(), fn)
}
-func (fca *FilerClientAccessor) SaveTopicConfToFiler(t *mq_pb.Topic, conf *mq_pb.ConfigureTopicResponse) error {
+func (fca *FilerClientAccessor) SaveTopicConfToFiler(t topic.Topic, conf *mq_pb.ConfigureTopicResponse) error {
glog.V(0).Infof("save conf for topic %v to filer", t)
// save the topic configuration on filer
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- if err := fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- var buf bytes.Buffer
- filer.ProtoToText(&buf, conf)
- return filer.SaveInsideFiler(client, topicDir, "topic.conf", buf.Bytes())
- }); err != nil {
- return fmt.Errorf("save topic to %s: %v", topicDir, err)
- }
- return nil
+ return fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return t.WriteConfFile(client, conf)
+ })
}
func (fca *FilerClientAccessor) ReadTopicConfFromFiler(t topic.Topic) (conf *mq_pb.ConfigureTopicResponse, err error) {
- glog.V(0).Infof("load conf for topic %v from filer", t)
+ glog.V(1).Infof("load conf for topic %v from filer", t)
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
if err = fca.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- data, err := filer.ReadInsideFiler(client, topicDir, "topic.conf")
- if err == filer_pb.ErrNotFound {
- return err
- }
- if err != nil {
- return fmt.Errorf("read topic.conf of %v: %v", t, err)
- }
- // parse into filer conf object
- conf = &mq_pb.ConfigureTopicResponse{}
- if err = jsonpb.Unmarshal(data, conf); err != nil {
- return fmt.Errorf("unmarshal topic %v conf: %v", t, err)
- }
- return nil
+ conf, err = t.ReadConfFile(client)
+ return err
}); err != nil {
return nil, err
}
diff --git a/weed/mq/broker/broker_grpc_assign.go b/weed/mq/broker/broker_grpc_assign.go
index 48ec0d5bd..9a9b34c0b 100644
--- a/weed/mq/broker/broker_grpc_assign.go
+++ b/weed/mq/broker/broker_grpc_assign.go
@@ -4,6 +4,7 @@ import (
"context"
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb"
@@ -26,7 +27,7 @@ func (b *MessageQueueBroker) AssignTopicPartitions(c context.Context, request *m
} else {
var localPartition *topic.LocalPartition
if localPartition = b.localTopicManager.GetLocalPartition(t, partition); localPartition == nil {
- localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition))
b.localTopicManager.AddLocalPartition(t, localPartition)
}
}
diff --git a/weed/mq/broker/broker_grpc_configure.go b/weed/mq/broker/broker_grpc_configure.go
index 7222c8359..361af5c43 100644
--- a/weed/mq/broker/broker_grpc_configure.go
+++ b/weed/mq/broker/broker_grpc_configure.go
@@ -67,7 +67,7 @@ func (b *MessageQueueBroker) ConfigureTopic(ctx context.Context, request *mq_pb.
resp.RecordType = request.RecordType
// save the topic configuration on filer
- if err := b.fca.SaveTopicConfToFiler(request.Topic, resp); err != nil {
+ if err := b.fca.SaveTopicConfToFiler(t, resp); err != nil {
return nil, fmt.Errorf("configure topic: %v", err)
}
diff --git a/weed/mq/broker/broker_grpc_pub_follow.go b/weed/mq/broker/broker_grpc_pub_follow.go
index 8995b0cc2..291f1ef62 100644
--- a/weed/mq/broker/broker_grpc_pub_follow.go
+++ b/weed/mq/broker/broker_grpc_pub_follow.go
@@ -2,7 +2,6 @@ package broker
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -93,9 +92,7 @@ func (b *MessageQueueBroker) PublishFollowMe(stream mq_pb.SeaweedMessaging_Publi
time.Sleep(113 * time.Millisecond)
}
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
- partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
+ partitionDir := topic.PartitionDir(t, p)
// flush the remaining messages
inMemoryBuffers.CloseInput()
diff --git a/weed/mq/broker/broker_grpc_sub_follow.go b/weed/mq/broker/broker_grpc_sub_follow.go
index f7f4ac7e9..bed906c30 100644
--- a/weed/mq/broker/broker_grpc_sub_follow.go
+++ b/weed/mq/broker/broker_grpc_sub_follow.go
@@ -9,7 +9,6 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
- "time"
)
func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_SubscribeFollowMeServer) (err error) {
@@ -65,9 +64,7 @@ func (b *MessageQueueBroker) SubscribeFollowMe(stream mq_pb.SeaweedMessaging_Sub
func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.SubscribeMessageRequest_InitMessage) (offset int64, err error) {
t, p := topic.FromPbTopic(initMessage.Topic), topic.FromPbPartition(initMessage.PartitionOffset.Partition)
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
- partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
+ partitionDir := topic.PartitionDir(t, p)
offsetFileName := fmt.Sprintf("%s.offset", initMessage.ConsumerGroup)
err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
@@ -86,9 +83,7 @@ func (b *MessageQueueBroker) readConsumerGroupOffset(initMessage *mq_pb.Subscrib
func (b *MessageQueueBroker) saveConsumerGroupOffset(t topic.Topic, p topic.Partition, consumerGroup string, offset int64) error {
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
- partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, p.RangeStart, p.RangeStop)
+ partitionDir := topic.PartitionDir(t, p)
offsetFileName := fmt.Sprintf("%s.offset", consumerGroup)
offsetBytes := make([]byte, 8)
diff --git a/weed/mq/broker/broker_topic_conf_read_write.go b/weed/mq/broker/broker_topic_conf_read_write.go
index ea5cb71b9..222ff16ba 100644
--- a/weed/mq/broker/broker_topic_conf_read_write.go
+++ b/weed/mq/broker/broker_topic_conf_read_write.go
@@ -3,6 +3,7 @@ package broker
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/logstore"
"github.com/seaweedfs/seaweedfs/weed/mq/pub_balancer"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
@@ -40,7 +41,7 @@ func (b *MessageQueueBroker) genLocalPartitionFromFiler(t topic.Topic, partition
self := b.option.BrokerAddress()
for _, assignment := range conf.BrokerPartitionAssignments {
if assignment.LeaderBroker == string(self) && partition.Equals(topic.FromPbPartition(assignment.Partition)) {
- localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, assignment.Partition), b.genLogOnDiskReadFunc(t, assignment.Partition))
+ localPartition = topic.NewLocalPartition(partition, b.genLogFlushFunc(t, partition), logstore.GenMergedReadFunc(b, t, partition))
b.localTopicManager.AddLocalPartition(t, localPartition)
isGenerated = true
break
@@ -55,7 +56,7 @@ func (b *MessageQueueBroker) ensureTopicActiveAssignments(t topic.Topic, conf *m
hasChanges := pub_balancer.EnsureAssignmentsToActiveBrokers(b.PubBalancer.Brokers, 1, conf.BrokerPartitionAssignments)
if hasChanges {
glog.V(0).Infof("topic %v partition updated assignments: %v", t, conf.BrokerPartitionAssignments)
- if err = b.fca.SaveTopicConfToFiler(t.ToPbTopic(), conf); err != nil {
+ if err = b.fca.SaveTopicConfToFiler(t, conf); err != nil {
return err
}
}
diff --git a/weed/mq/broker/broker_topic_partition_read_write.go b/weed/mq/broker/broker_topic_partition_read_write.go
index 4c1b9a1e2..d6513b2a2 100644
--- a/weed/mq/broker/broker_topic_partition_read_write.go
+++ b/weed/mq/broker/broker_topic_partition_read_write.go
@@ -2,24 +2,15 @@ package broker
import (
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
- "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "github.com/seaweedfs/seaweedfs/weed/util"
"github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
- "google.golang.org/protobuf/proto"
- "math"
"sync/atomic"
"time"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
-func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogFlushFuncType {
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
- partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
+func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, p topic.Partition) log_buffer.LogFlushFuncType {
+ partitionDir := topic.PartitionDir(t, p)
return func(logBuffer *log_buffer.LogBuffer, startTime, stopTime time.Time, buf []byte) {
if len(buf) == 0 {
@@ -45,7 +36,6 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
b.accessLock.Lock()
defer b.accessLock.Unlock()
- p := topic.FromPbPartition(partition)
if localPartition := b.localTopicManager.GetLocalPartition(t, p); localPartition != nil {
localPartition.NotifyLogFlushed(logBuffer.LastFlushTsNs)
}
@@ -53,126 +43,3 @@ func (b *MessageQueueBroker) genLogFlushFunc(t topic.Topic, partition *mq_pb.Par
glog.V(0).Infof("flushing at %d to %s size %d", logBuffer.LastFlushTsNs, targetFile, len(buf))
}
}
-
-func (b *MessageQueueBroker) genLogOnDiskReadFunc(t topic.Topic, partition *mq_pb.Partition) log_buffer.LogReadFromDiskFuncType {
- topicDir := fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
- partitionGeneration := time.Unix(0, partition.UnixTimeNs).UTC().Format(topic.TIME_FORMAT)
- partitionDir := fmt.Sprintf("%s/%s/%04d-%04d", topicDir, partitionGeneration, partition.RangeStart, partition.RangeStop)
-
- lookupFileIdFn := func(fileId string) (targetUrls []string, err error) {
- return b.MasterClient.LookupFileId(fileId)
- }
-
- eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
- for pos := 0; pos+4 < len(buf); {
-
- size := util.BytesToUint32(buf[pos : pos+4])
- if pos+4+int(size) > len(buf) {
- err = fmt.Errorf("LogOnDiskReadFunc: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
- return
- }
- entryData := buf[pos+4 : pos+4+int(size)]
-
- logEntry := &filer_pb.LogEntry{}
- if err = proto.Unmarshal(entryData, logEntry); err != nil {
- pos += 4 + int(size)
- err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
- return
- }
- if logEntry.TsNs < starTsNs {
- pos += 4 + int(size)
- continue
- }
- if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
- println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
- return
- }
-
- if _, err = eachLogEntryFn(logEntry); err != nil {
- err = fmt.Errorf("process log entry %v: %v", logEntry, err)
- return
- }
-
- processedTsNs = logEntry.TsNs
-
- pos += 4 + int(size)
-
- }
-
- return
- }
-
- eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
- if len(entry.Content) > 0 {
- // skip .offset files
- return
- }
- var urlStrings []string
- for _, chunk := range entry.Chunks {
- if chunk.Size == 0 {
- continue
- }
- if chunk.IsChunkManifest {
- glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
- return
- }
- urlStrings, err = lookupFileIdFn(chunk.FileId)
- if err != nil {
- err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
- return
- }
- if len(urlStrings) == 0 {
- err = fmt.Errorf("no url found for %s", chunk.FileId)
- return
- }
-
- // try one of the urlString until util.Get(urlString) succeeds
- var processed bool
- for _, urlString := range urlStrings {
- // TODO optimization opportunity: reuse the buffer
- var data []byte
- if data, _, err = util_http.Get(urlString); err == nil {
- processed = true
- if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {
- return
- }
- break
- }
- }
- if !processed {
- err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
- return
- }
-
- }
- return
- }
-
- return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
- startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
- startTsNs := startPosition.Time.UnixNano()
- stopTime := time.Unix(0, stopTsNs)
- var processedTsNs int64
- err = b.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
- return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
- if entry.IsDirectory {
- return nil
- }
- if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
- isDone = true
- return nil
- }
- if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) {
- return nil
- }
- if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
- return err
- }
- return nil
-
- }, startFileName, true, math.MaxInt32)
- })
- lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
- return
- }
-}
diff --git a/weed/mq/client/cmd/weed_pub_record/publisher_record.go b/weed/mq/client/cmd/weed_pub_record/publisher_record.go
index f340dd1c8..9b28200bc 100644
--- a/weed/mq/client/cmd/weed_pub_record/publisher_record.go
+++ b/weed/mq/client/cmd/weed_pub_record/publisher_record.go
@@ -7,11 +7,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/schema"
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"log"
"strings"
"sync"
+ "sync/atomic"
"time"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -25,11 +26,17 @@ var (
namespace = flag.String("ns", "test", "namespace")
t = flag.String("t", "test", "t")
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
+
+ counter int32
)
func doPublish(publisher *pub_client.TopicPublisher, id int) {
startTime := time.Now()
- for i := 0; i < *messageCount / *concurrency; i++ {
+ for {
+ i := atomic.AddInt32(&counter, 1)
+ if i > int32(*messageCount) {
+ break
+ }
// Simulate publishing a message
myRecord := genMyRecord(int32(i))
if err := publisher.PublishRecord(myRecord.Key, myRecord.ToRecordValue()); err != nil {
@@ -38,7 +45,7 @@ func doPublish(publisher *pub_client.TopicPublisher, id int) {
}
if *messageDelay > 0 {
time.Sleep(*messageDelay)
- fmt.Printf("sent %+v\n", myRecord)
+ fmt.Printf("sent %+v\n", string(myRecord.Key))
}
}
if err := publisher.FinishPublish(); err != nil {
diff --git a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
index 00fe83feb..7bdff3715 100644
--- a/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
+++ b/weed/mq/client/cmd/weed_sub_record/subscriber_record.go
@@ -8,12 +8,12 @@ import (
"github.com/seaweedfs/seaweedfs/weed/mq/topic"
"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"google.golang.org/protobuf/proto"
"strings"
"time"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
@@ -22,6 +22,7 @@ var (
seedBrokers = flag.String("brokers", "localhost:17777", "seed brokers")
maxPartitionCount = flag.Int("maxPartitionCount", 3, "max partition count")
perPartitionConcurrency = flag.Int("perPartitionConcurrency", 1, "per partition concurrency")
+ timeAgo = flag.Duration("timeAgo", 1*time.Hour, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
clientId = flag.Uint("client_id", uint(util.RandomInt32()), "client id")
)
@@ -65,7 +66,7 @@ func main() {
contentConfig := &sub_client.ContentConfiguration{
Topic: topic.NewTopic(*namespace, *t),
Filter: "",
- StartTime: time.Unix(1, 1),
+ StartTime: time.Now().Add(-*timeAgo),
}
brokers := strings.Split(*seedBrokers, ",")
@@ -75,9 +76,13 @@ func main() {
subscriber.SetEachMessageFunc(func(key, value []byte) error {
counter++
record := &schema_pb.RecordValue{}
- proto.Unmarshal(value, record)
- fmt.Printf("record: %v\n", record)
- time.Sleep(1300 * time.Millisecond)
+ err := proto.Unmarshal(value, record)
+ if err != nil {
+ fmt.Printf("unmarshal record value: %v\n", err)
+ } else {
+ fmt.Printf("%s %d: %v\n", string(key), len(value), record)
+ }
+ //time.Sleep(1300 * time.Millisecond)
return nil
})
diff --git a/weed/mq/client/sub_client/on_each_partition.go b/weed/mq/client/sub_client/on_each_partition.go
index 58d87d9ad..56cedb32e 100644
--- a/weed/mq/client/sub_client/on_each_partition.go
+++ b/weed/mq/client/sub_client/on_each_partition.go
@@ -8,6 +8,7 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
"github.com/seaweedfs/seaweedfs/weed/util"
"io"
+ "reflect"
)
func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssignment, stopCh chan struct{}) error {
@@ -24,6 +25,11 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
perPartitionConcurrency = 1
}
+ var stopTsNs int64
+ if !sub.ContentConfig.StopTime.IsZero() {
+ stopTsNs = sub.ContentConfig.StopTime.UnixNano()
+ }
+
if err = subscribeClient.Send(&mq_pb.SubscribeMessageRequest{
Message: &mq_pb.SubscribeMessageRequest_Init{
Init: &mq_pb.SubscribeMessageRequest_InitMessage{
@@ -32,6 +38,8 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
Topic: sub.ContentConfig.Topic.ToPbTopic(),
PartitionOffset: &mq_pb.PartitionOffset{
Partition: assigned.Partition,
+ StartTsNs: sub.ContentConfig.StartTime.UnixNano(),
+ StopTsNs: stopTsNs,
StartType: mq_pb.PartitionOffsetStartType_EARLIEST_IN_MEMORY,
},
Filter: sub.ContentConfig.Filter,
@@ -101,6 +109,10 @@ func (sub *TopicSubscriber) onEachPartition(assigned *mq_pb.BrokerPartitionAssig
glog.V(2).Infof("subscriber %s received control from producer:%s isClose:%v", sub.SubscriberConfig.ConsumerGroup, m.Data.Ctrl.PublisherName, m.Data.Ctrl.IsClose)
continue
}
+ if len(m.Data.Key) == 0 {
+ fmt.Printf("empty key %+v, type %v\n", m, reflect.TypeOf(m))
+ continue
+ }
executors.Execute(func() {
processErr := sub.OnEachMessageFunc(m.Data.Key, m.Data.Value)
if processErr == nil {
diff --git a/weed/mq/client/sub_client/subscriber.go b/weed/mq/client/sub_client/subscriber.go
index 922593b77..3e5316b67 100644
--- a/weed/mq/client/sub_client/subscriber.go
+++ b/weed/mq/client/sub_client/subscriber.go
@@ -21,6 +21,7 @@ type ContentConfiguration struct {
Topic topic.Topic
Filter string
StartTime time.Time
+ StopTime time.Time
}
type OnEachMessageFunc func(key, value []byte) (err error)
diff --git a/weed/mq/logstore/log_to_parquet.go b/weed/mq/logstore/log_to_parquet.go
new file mode 100644
index 000000000..30cad8cc1
--- /dev/null
+++ b/weed/mq/logstore/log_to_parquet.go
@@ -0,0 +1,454 @@
+package logstore
+
+import (
+ "encoding/binary"
+ "fmt"
+ "github.com/parquet-go/parquet-go"
+ "github.com/parquet-go/parquet-go/compress/zstd"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "google.golang.org/protobuf/proto"
+ "io"
+ "os"
+ "strings"
+ "time"
+)
+
+const (
+ SW_COLUMN_NAME_TS = "_ts_ns"
+ SW_COLUMN_NAME_KEY = "_key"
+)
+
+func CompactTopicPartitions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
+ // list the topic partition versions
+ topicVersions, err := collectTopicVersions(filerClient, t, timeAgo)
+ if err != nil {
+ return fmt.Errorf("list topic files: %v", err)
+ }
+
+ // compact the partitions
+ for _, topicVersion := range topicVersions {
+ partitions, err := collectTopicVersionsPartitions(filerClient, t, topicVersion)
+ if err != nil {
+ return fmt.Errorf("list partitions %s/%s/%s: %v", t.Namespace, t.Name, topicVersion, err)
+ }
+ for _, partition := range partitions {
+ err := compactTopicPartition(filerClient, t, timeAgo, recordType, partition, preference)
+ if err != nil {
+ return fmt.Errorf("compact partition %s/%s/%s/%s: %v", t.Namespace, t.Name, topicVersion, partition, err)
+ }
+ }
+ }
+ return nil
+}
+
+func collectTopicVersions(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration) (partitionVersions []time.Time, err error) {
+ err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()), "", func(entry *filer_pb.Entry, isLast bool) error {
+ t, err := topic.ParseTopicVersion(entry.Name)
+ if err != nil {
+ // skip non-partition directories
+ return nil
+ }
+ if t.Unix() < time.Now().Unix()-int64(timeAgo/time.Second) {
+ partitionVersions = append(partitionVersions, t)
+ }
+ return nil
+ })
+ return
+}
+
+func collectTopicVersionsPartitions(filerClient filer_pb.FilerClient, t topic.Topic, topicVersion time.Time) (partitions []topic.Partition, err error) {
+ version := topicVersion.Format(topic.PartitionGenerationFormat)
+ err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(t.Dir()).Child(version), "", func(entry *filer_pb.Entry, isLast bool) error {
+ if !entry.IsDirectory {
+ return nil
+ }
+ start, stop := topic.ParsePartitionBoundary(entry.Name)
+ if start != stop {
+ partitions = append(partitions, topic.Partition{
+ RangeStart: start,
+ RangeStop: stop,
+ RingSize: topic.PartitionCount,
+ UnixTimeNs: topicVersion.UnixNano(),
+ })
+ }
+ return nil
+ })
+ return
+}
+
+func compactTopicPartition(filerClient filer_pb.FilerClient, t topic.Topic, timeAgo time.Duration, recordType *schema_pb.RecordType, partition topic.Partition, preference *operation.StoragePreference) error {
+ partitionDir := topic.PartitionDir(t, partition)
+
+ // compact the partition directory
+ return compactTopicPartitionDir(filerClient, t.Name, partitionDir, timeAgo, recordType, preference)
+}
+
+func compactTopicPartitionDir(filerClient filer_pb.FilerClient, topicName, partitionDir string, timeAgo time.Duration, recordType *schema_pb.RecordType, preference *operation.StoragePreference) error {
+ // read all existing parquet files
+ minTsNs, maxTsNs, err := readAllParquetFiles(filerClient, partitionDir)
+ if err != nil {
+ return err
+ }
+
+ // read all log files
+ logFiles, err := readAllLogFiles(filerClient, partitionDir, timeAgo, minTsNs, maxTsNs)
+ if err != nil {
+ return err
+ }
+ if len(logFiles) == 0 {
+ return nil
+ }
+
+ // divide log files into groups of 128MB
+ logFileGroups := groupFilesBySize(logFiles, 128*1024*1024)
+
+ // write to parquet file
+ parquetLevels, err := schema.ToParquetLevels(recordType)
+ if err != nil {
+ return fmt.Errorf("ToParquetLevels failed %+v: %v", recordType, err)
+ }
+
+ // create a parquet schema
+ parquetSchema, err := schema.ToParquetSchema(topicName, recordType)
+ if err != nil {
+ return fmt.Errorf("ToParquetSchema failed: %v", err)
+ }
+
+ // TODO parallelize the writing
+ for _, logFileGroup := range logFileGroups {
+ if err = writeLogFilesToParquet(filerClient, partitionDir, recordType, logFileGroup, parquetSchema, parquetLevels, preference); err != nil {
+ return err
+ }
+ }
+
+ return nil
+}
+
+func groupFilesBySize(logFiles []*filer_pb.Entry, maxGroupSize int64) (logFileGroups [][]*filer_pb.Entry) {
+ var logFileGroup []*filer_pb.Entry
+ var groupSize int64
+ for _, logFile := range logFiles {
+ if groupSize+int64(logFile.Attributes.FileSize) > maxGroupSize {
+ logFileGroups = append(logFileGroups, logFileGroup)
+ logFileGroup = nil
+ groupSize = 0
+ }
+ logFileGroup = append(logFileGroup, logFile)
+ groupSize += int64(logFile.Attributes.FileSize)
+ }
+ if len(logFileGroup) > 0 {
+ logFileGroups = append(logFileGroups, logFileGroup)
+ }
+ return
+}
+
+func readAllLogFiles(filerClient filer_pb.FilerClient, partitionDir string, timeAgo time.Duration, minTsNs, maxTsNs int64) (logFiles []*filer_pb.Entry, err error) {
+ err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
+ if strings.HasSuffix(entry.Name, ".parquet") {
+ return nil
+ }
+ if entry.Attributes.Crtime > time.Now().Unix()-int64(timeAgo/time.Second) {
+ return nil
+ }
+ logTime, err := time.Parse(topic.TIME_FORMAT, entry.Name)
+ if err != nil {
+ // glog.Warningf("parse log time %s: %v", entry.Name, err)
+ return nil
+ }
+ if maxTsNs > 0 && logTime.UnixNano() <= maxTsNs {
+ return nil
+ }
+ logFiles = append(logFiles, entry)
+ return nil
+ })
+ return
+}
+
+func readAllParquetFiles(filerClient filer_pb.FilerClient, partitionDir string) (minTsNs, maxTsNs int64, err error) {
+ err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(partitionDir), "", func(entry *filer_pb.Entry, isLast bool) error {
+ if !strings.HasSuffix(entry.Name, ".parquet") {
+ return nil
+ }
+ if len(entry.Extended) == 0 {
+ return nil
+ }
+
+ // read min ts
+ minTsBytes := entry.Extended["min"]
+ if len(minTsBytes) != 8 {
+ return nil
+ }
+ minTs := int64(binary.BigEndian.Uint64(minTsBytes))
+ if minTsNs == 0 || minTs < minTsNs {
+ minTsNs = minTs
+ }
+
+ // read max ts
+ maxTsBytes := entry.Extended["max"]
+ if len(maxTsBytes) != 8 {
+ return nil
+ }
+ maxTs := int64(binary.BigEndian.Uint64(maxTsBytes))
+ if maxTsNs == 0 || maxTs > maxTsNs {
+ maxTsNs = maxTs
+ }
+ return nil
+ })
+ return
+}
+
+func writeLogFilesToParquet(filerClient filer_pb.FilerClient, partitionDir string, recordType *schema_pb.RecordType, logFileGroups []*filer_pb.Entry, parquetSchema *parquet.Schema, parquetLevels *schema.ParquetLevels, preference *operation.StoragePreference) (err error) {
+
+ tempFile, err := os.CreateTemp(".", "t*.parquet")
+ if err != nil {
+ return fmt.Errorf("create temp file: %v", err)
+ }
+ defer func() {
+ tempFile.Close()
+ os.Remove(tempFile.Name())
+ }()
+
+ writer := parquet.NewWriter(tempFile, parquetSchema, parquet.Compression(&zstd.Codec{Level: zstd.DefaultLevel}))
+ rowBuilder := parquet.NewRowBuilder(parquetSchema)
+
+ var startTsNs, stopTsNs int64
+
+ for _, logFile := range logFileGroups {
+ fmt.Printf("compact %s/%s ", partitionDir, logFile.Name)
+ var rows []parquet.Row
+ if err := iterateLogEntries(filerClient, logFile, func(entry *filer_pb.LogEntry) error {
+
+ if startTsNs == 0 {
+ startTsNs = entry.TsNs
+ }
+ stopTsNs = entry.TsNs
+
+ if len(entry.Key) == 0 {
+ return nil
+ }
+
+ // write to parquet file
+ rowBuilder.Reset()
+
+ record := &schema_pb.RecordValue{}
+ if err := proto.Unmarshal(entry.Data, record); err != nil {
+ return fmt.Errorf("unmarshal record value: %v", err)
+ }
+
+ record.Fields[SW_COLUMN_NAME_TS] = &schema_pb.Value{
+ Kind: &schema_pb.Value_Int64Value{
+ Int64Value: entry.TsNs,
+ },
+ }
+ record.Fields[SW_COLUMN_NAME_KEY] = &schema_pb.Value{
+ Kind: &schema_pb.Value_BytesValue{
+ BytesValue: entry.Key,
+ },
+ }
+
+ if err := schema.AddRecordValue(rowBuilder, recordType, parquetLevels, record); err != nil {
+ return fmt.Errorf("add record value: %v", err)
+ }
+
+ rows = append(rows, rowBuilder.Row())
+
+ return nil
+
+ }); err != nil {
+ return fmt.Errorf("iterate log entry %v/%v: %v", partitionDir, logFile.Name, err)
+ }
+
+ fmt.Printf("processed %d rows\n", len(rows))
+
+ if _, err := writer.WriteRows(rows); err != nil {
+ return fmt.Errorf("write rows: %v", err)
+ }
+ }
+
+ if err := writer.Close(); err != nil {
+ return fmt.Errorf("close writer: %v", err)
+ }
+
+ // write to parquet file to partitionDir
+ parquetFileName := fmt.Sprintf("%s.parquet", time.Unix(0, startTsNs).UTC().Format("2006-01-02-15-04-05"))
+ if err := saveParquetFileToPartitionDir(filerClient, tempFile, partitionDir, parquetFileName, preference, startTsNs, stopTsNs); err != nil {
+ return fmt.Errorf("save parquet file %s: %v", parquetFileName, err)
+ }
+
+ return nil
+
+}
+
+func saveParquetFileToPartitionDir(filerClient filer_pb.FilerClient, sourceFile *os.File, partitionDir, parquetFileName string, preference *operation.StoragePreference, startTsNs, stopTsNs int64) error {
+ uploader, err := operation.NewUploader()
+ if err != nil {
+ return fmt.Errorf("new uploader: %v", err)
+ }
+
+ // get file size
+ fileInfo, err := sourceFile.Stat()
+ if err != nil {
+ return fmt.Errorf("stat source file: %v", err)
+ }
+
+ // upload file in chunks
+ chunkSize := int64(4 * 1024 * 1024)
+ chunkCount := (fileInfo.Size() + chunkSize - 1) / chunkSize
+ entry := &filer_pb.Entry{
+ Name: parquetFileName,
+ Attributes: &filer_pb.FuseAttributes{
+ Crtime: time.Now().Unix(),
+ Mtime: time.Now().Unix(),
+ FileMode: uint32(os.FileMode(0644)),
+ FileSize: uint64(fileInfo.Size()),
+ Mime: "application/vnd.apache.parquet",
+ },
+ }
+ entry.Extended = make(map[string][]byte)
+ minTsBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(minTsBytes, uint64(startTsNs))
+ entry.Extended["min"] = minTsBytes
+ maxTsBytes := make([]byte, 8)
+ binary.BigEndian.PutUint64(maxTsBytes, uint64(stopTsNs))
+ entry.Extended["max"] = maxTsBytes
+
+ for i := int64(0); i < chunkCount; i++ {
+ fileId, uploadResult, err, _ := uploader.UploadWithRetry(
+ filerClient,
+ &filer_pb.AssignVolumeRequest{
+ Count: 1,
+ Replication: preference.Replication,
+ Collection: preference.Collection,
+ TtlSec: 0, // TODO set ttl
+ DiskType: preference.DiskType,
+ Path: partitionDir + "/" + parquetFileName,
+ },
+ &operation.UploadOption{
+ Filename: parquetFileName,
+ Cipher: false,
+ IsInputCompressed: false,
+ MimeType: "application/vnd.apache.parquet",
+ PairMap: nil,
+ },
+ func(host, fileId string) string {
+ return fmt.Sprintf("http://%s/%s", host, fileId)
+ },
+ io.NewSectionReader(sourceFile, i*chunkSize, chunkSize),
+ )
+ if err != nil {
+ return fmt.Errorf("upload chunk %d: %v", i, err)
+ }
+ if uploadResult.Error != "" {
+ return fmt.Errorf("upload result: %v", uploadResult.Error)
+ }
+ entry.Chunks = append(entry.Chunks, uploadResult.ToPbFileChunk(fileId, i*chunkSize, time.Now().UnixNano()))
+ }
+
+ // write the entry to partitionDir
+ if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return filer_pb.CreateEntry(client, &filer_pb.CreateEntryRequest{
+ Directory: partitionDir,
+ Entry: entry,
+ })
+ }); err != nil {
+ return fmt.Errorf("create entry: %v", err)
+ }
+ fmt.Printf("saved to %s/%s\n", partitionDir, parquetFileName)
+
+ return nil
+}
+
+func iterateLogEntries(filerClient filer_pb.FilerClient, logFile *filer_pb.Entry, eachLogEntryFn func(entry *filer_pb.LogEntry) error) error {
+ lookupFn := filer.LookupFn(filerClient)
+ _, err := eachFile(logFile, lookupFn, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
+ if err := eachLogEntryFn(logEntry); err != nil {
+ return true, err
+ }
+ return false, nil
+ })
+ return err
+}
+
+func eachFile(entry *filer_pb.Entry, lookupFileIdFn func(fileId string) (targetUrls []string, err error), eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
+ if len(entry.Content) > 0 {
+ // skip .offset files
+ return
+ }
+ var urlStrings []string
+ for _, chunk := range entry.Chunks {
+ if chunk.Size == 0 {
+ continue
+ }
+ if chunk.IsChunkManifest {
+ fmt.Printf("this should not happen. unexpected chunk manifest in %s", entry.Name)
+ return
+ }
+ urlStrings, err = lookupFileIdFn(chunk.FileId)
+ if err != nil {
+ err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
+ return
+ }
+ if len(urlStrings) == 0 {
+ err = fmt.Errorf("no url found for %s", chunk.FileId)
+ return
+ }
+
+ // try one of the urlString until util.Get(urlString) succeeds
+ var processed bool
+ for _, urlString := range urlStrings {
+ var data []byte
+ if data, _, err = util_http.Get(urlString); err == nil {
+ processed = true
+ if processedTsNs, err = eachChunk(data, eachLogEntryFn); err != nil {
+ return
+ }
+ break
+ }
+ }
+ if !processed {
+ err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
+ return
+ }
+
+ }
+ return
+}
+
+func eachChunk(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType) (processedTsNs int64, err error) {
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ err = fmt.Errorf("reach each log chunk: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
+ return
+ }
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ pos += 4 + int(size)
+ err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
+ return
+ }
+
+ if _, err = eachLogEntryFn(logEntry); err != nil {
+ err = fmt.Errorf("process log entry %v: %v", logEntry, err)
+ return
+ }
+
+ processedTsNs = logEntry.TsNs
+
+ pos += 4 + int(size)
+
+ }
+
+ return
+}
diff --git a/weed/mq/logstore/merged_read.go b/weed/mq/logstore/merged_read.go
new file mode 100644
index 000000000..03a47ace4
--- /dev/null
+++ b/weed/mq/logstore/merged_read.go
@@ -0,0 +1,41 @@
+package logstore
+
+import (
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+)
+
+func GenMergedReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
+ fromParquetFn := GenParquetReadFunc(filerClient, t, p)
+ readLogDirectFn := GenLogOnDiskReadFunc(filerClient, t, p)
+ return mergeReadFuncs(fromParquetFn, readLogDirectFn)
+}
+
+func mergeReadFuncs(fromParquetFn, readLogDirectFn log_buffer.LogReadFromDiskFuncType) log_buffer.LogReadFromDiskFuncType {
+ var exhaustedParquet bool
+ var lastProcessedPosition log_buffer.MessagePosition
+ return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
+ if !exhaustedParquet {
+ // glog.V(4).Infof("reading from parquet startPosition: %v\n", startPosition.UTC())
+ lastReadPosition, isDone, err = fromParquetFn(startPosition, stopTsNs, eachLogEntryFn)
+ // glog.V(4).Infof("read from parquet: %v %v %v %v\n", startPosition, lastReadPosition, isDone, err)
+ if isDone {
+ isDone = false
+ }
+ if err != nil {
+ return
+ }
+ lastProcessedPosition = lastReadPosition
+ }
+ exhaustedParquet = true
+
+ if startPosition.Before(lastProcessedPosition.Time) {
+ startPosition = lastProcessedPosition
+ }
+
+ // glog.V(4).Infof("reading from direct log startPosition: %v\n", startPosition.UTC())
+ lastReadPosition, isDone, err = readLogDirectFn(startPosition, stopTsNs, eachLogEntryFn)
+ return
+ }
+}
diff --git a/weed/mq/logstore/read_log_from_disk.go b/weed/mq/logstore/read_log_from_disk.go
new file mode 100644
index 000000000..c3c679b87
--- /dev/null
+++ b/weed/mq/logstore/read_log_from_disk.go
@@ -0,0 +1,144 @@
+package logstore
+
+import (
+ "fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/glog"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "google.golang.org/protobuf/proto"
+ "math"
+ "strings"
+ "time"
+)
+
+func GenLogOnDiskReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
+ partitionDir := topic.PartitionDir(t, p)
+
+ lookupFileIdFn := filer.LookupFn(filerClient)
+
+ eachChunkFn := func(buf []byte, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ err = fmt.Errorf("GenLogOnDiskReadFunc: read [%d,%d) from [0,%d)", pos, pos+int(size)+4, len(buf))
+ return
+ }
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ pos += 4 + int(size)
+ err = fmt.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
+ return
+ }
+ if logEntry.TsNs < starTsNs {
+ pos += 4 + int(size)
+ continue
+ }
+ if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
+ println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
+ return
+ }
+
+ // fmt.Printf(" read logEntry: %v, ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
+ if _, err = eachLogEntryFn(logEntry); err != nil {
+ err = fmt.Errorf("process log entry %v: %v", logEntry, err)
+ return
+ }
+
+ processedTsNs = logEntry.TsNs
+
+ pos += 4 + int(size)
+
+ }
+
+ return
+ }
+
+ eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
+ if len(entry.Content) > 0 {
+ // skip .offset files
+ return
+ }
+ var urlStrings []string
+ for _, chunk := range entry.Chunks {
+ if chunk.Size == 0 {
+ continue
+ }
+ if chunk.IsChunkManifest {
+ glog.Warningf("this should not happen. unexpected chunk manifest in %s/%s", partitionDir, entry.Name)
+ return
+ }
+ urlStrings, err = lookupFileIdFn(chunk.FileId)
+ if err != nil {
+ err = fmt.Errorf("lookup %s: %v", chunk.FileId, err)
+ return
+ }
+ if len(urlStrings) == 0 {
+ err = fmt.Errorf("no url found for %s", chunk.FileId)
+ return
+ }
+
+ // try one of the urlString until util.Get(urlString) succeeds
+ var processed bool
+ for _, urlString := range urlStrings {
+ // TODO optimization opportunity: reuse the buffer
+ var data []byte
+ // fmt.Printf("reading %s/%s %s\n", partitionDir, entry.Name, urlString)
+ if data, _, err = util_http.Get(urlString); err == nil {
+ processed = true
+ if processedTsNs, err = eachChunkFn(data, eachLogEntryFn, starTsNs, stopTsNs); err != nil {
+ return
+ }
+ break
+ }
+ }
+ if !processed {
+ err = fmt.Errorf("no data processed for %s %s", entry.Name, chunk.FileId)
+ return
+ }
+
+ }
+ return
+ }
+
+ return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
+ startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
+ startTsNs := startPosition.Time.UnixNano()
+ stopTime := time.Unix(0, stopTsNs)
+ var processedTsNs int64
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ if entry.IsDirectory {
+ return nil
+ }
+ if strings.HasSuffix(entry.Name, ".parquet") {
+ return nil
+ }
+ // FIXME: this is a hack to skip the .offset files
+ if strings.HasSuffix(entry.Name, ".offset") {
+ return nil
+ }
+ if stopTsNs != 0 && entry.Name > stopTime.UTC().Format(topic.TIME_FORMAT) {
+ isDone = true
+ return nil
+ }
+ if entry.Name < startPosition.UTC().Format(topic.TIME_FORMAT) {
+ return nil
+ }
+ if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
+ return err
+ }
+ return nil
+
+ }, startFileName, true, math.MaxInt32)
+ })
+ lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
+ return
+ }
+}
diff --git a/weed/mq/logstore/read_parquet_to_log.go b/weed/mq/logstore/read_parquet_to_log.go
new file mode 100644
index 000000000..f55d5e3b7
--- /dev/null
+++ b/weed/mq/logstore/read_parquet_to_log.go
@@ -0,0 +1,162 @@
+package logstore
+
+import (
+ "encoding/binary"
+ "fmt"
+ "github.com/parquet-go/parquet-go"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "github.com/seaweedfs/seaweedfs/weed/util/chunk_cache"
+ "github.com/seaweedfs/seaweedfs/weed/util/log_buffer"
+ "google.golang.org/protobuf/proto"
+ "io"
+ "math"
+ "strings"
+)
+
+var (
+ chunkCache = chunk_cache.NewChunkCacheInMemory(256) // 256 entries, 8MB max per entry
+)
+
+func GenParquetReadFunc(filerClient filer_pb.FilerClient, t topic.Topic, p topic.Partition) log_buffer.LogReadFromDiskFuncType {
+ partitionDir := topic.PartitionDir(t, p)
+
+ lookupFileIdFn := filer.LookupFn(filerClient)
+
+ // read topic conf from filer
+ var topicConf *mq_pb.ConfigureTopicResponse
+ var err error
+ if err := filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+ topicConf, err = t.ReadConfFile(client)
+ return err
+ }); err != nil {
+ return nil
+ }
+ recordType := topicConf.GetRecordType()
+ recordType = schema.NewRecordTypeBuilder(recordType).
+ WithField(SW_COLUMN_NAME_TS, schema.TypeInt64).
+ WithField(SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ RecordTypeEnd()
+
+ parquetSchema, err := schema.ToParquetSchema(t.Name, recordType)
+ if err != nil {
+ return nil
+ }
+ parquetLevels, err := schema.ToParquetLevels(recordType)
+ if err != nil {
+ return nil
+ }
+
+ // eachFileFn reads a parquet file and calls eachLogEntryFn for each log entry
+ eachFileFn := func(entry *filer_pb.Entry, eachLogEntryFn log_buffer.EachLogEntryFuncType, starTsNs, stopTsNs int64) (processedTsNs int64, err error) {
+ // create readerAt for the parquet file
+ fileSize := filer.FileSize(entry)
+ visibleIntervals, _ := filer.NonOverlappingVisibleIntervals(lookupFileIdFn, entry.Chunks, 0, int64(fileSize))
+ chunkViews := filer.ViewFromVisibleIntervals(visibleIntervals, 0, int64(fileSize))
+ readerCache := filer.NewReaderCache(32, chunkCache, lookupFileIdFn)
+ readerAt := filer.NewChunkReaderAtFromClient(readerCache, chunkViews, int64(fileSize))
+
+ // create parquet reader
+ parquetReader := parquet.NewReader(readerAt, parquetSchema)
+ rows := make([]parquet.Row, 128)
+ for {
+ rowCount, readErr := parquetReader.ReadRows(rows)
+
+ for i := 0; i < rowCount; i++ {
+ row := rows[i]
+ // convert parquet row to schema_pb.RecordValue
+ recordValue, err := schema.ToRecordValue(recordType, parquetLevels, row)
+ if err != nil {
+ return processedTsNs, fmt.Errorf("ToRecordValue failed: %v", err)
+ }
+ processedTsNs = recordValue.Fields[SW_COLUMN_NAME_TS].GetInt64Value()
+ if processedTsNs < starTsNs {
+ continue
+ }
+ if stopTsNs != 0 && processedTsNs >= stopTsNs {
+ return processedTsNs, nil
+ }
+
+ data, marshalErr := proto.Marshal(recordValue)
+ if marshalErr != nil {
+ return processedTsNs, fmt.Errorf("marshal record value: %v", marshalErr)
+ }
+
+ logEntry := &filer_pb.LogEntry{
+ Key: recordValue.Fields[SW_COLUMN_NAME_KEY].GetBytesValue(),
+ TsNs: processedTsNs,
+ Data: data,
+ }
+
+ // fmt.Printf(" parquet entry %s ts %v\n", string(logEntry.Key), time.Unix(0, logEntry.TsNs).UTC())
+
+ if _, err = eachLogEntryFn(logEntry); err != nil {
+ return processedTsNs, fmt.Errorf("process log entry %v: %v", logEntry, err)
+ }
+ }
+
+ if readErr != nil {
+ if readErr == io.EOF {
+ return processedTsNs, nil
+ }
+ return processedTsNs, readErr
+ }
+ }
+ return
+ }
+
+ return func(startPosition log_buffer.MessagePosition, stopTsNs int64, eachLogEntryFn log_buffer.EachLogEntryFuncType) (lastReadPosition log_buffer.MessagePosition, isDone bool, err error) {
+ startFileName := startPosition.UTC().Format(topic.TIME_FORMAT)
+ startTsNs := startPosition.Time.UnixNano()
+ var processedTsNs int64
+
+ err = filerClient.WithFilerClient(false, func(client filer_pb.SeaweedFilerClient) error {
+
+ return filer_pb.SeaweedList(client, partitionDir, "", func(entry *filer_pb.Entry, isLast bool) error {
+ if entry.IsDirectory {
+ return nil
+ }
+ if !strings.HasSuffix(entry.Name, ".parquet") {
+ return nil
+ }
+ if len(entry.Extended) == 0 {
+ return nil
+ }
+
+ // read minTs from the parquet file
+ minTsBytes := entry.Extended["min"]
+ if len(minTsBytes) != 8 {
+ return nil
+ }
+ minTsNs := int64(binary.BigEndian.Uint64(minTsBytes))
+
+ // read max ts
+ maxTsBytes := entry.Extended["max"]
+ if len(maxTsBytes) != 8 {
+ return nil
+ }
+ maxTsNs := int64(binary.BigEndian.Uint64(maxTsBytes))
+
+ if stopTsNs != 0 && stopTsNs <= minTsNs {
+ isDone = true
+ return nil
+ }
+
+ if maxTsNs < startTsNs {
+ return nil
+ }
+
+ if processedTsNs, err = eachFileFn(entry, eachLogEntryFn, startTsNs, stopTsNs); err != nil {
+ return err
+ }
+ return nil
+
+ }, startFileName, true, math.MaxInt32)
+ })
+ lastReadPosition = log_buffer.NewMessagePosition(processedTsNs, -2)
+ return
+ }
+}
diff --git a/weed/mq/pub_balancer/broker_stats.go b/weed/mq/pub_balancer/broker_stats.go
index c579c275e..e72703d5f 100644
--- a/weed/mq/pub_balancer/broker_stats.go
+++ b/weed/mq/pub_balancer/broker_stats.go
@@ -53,7 +53,7 @@ func (bs *BrokerStats) UpdateStats(stats *mq_pb.BrokerStats) {
}
publisherCount += topicPartitionStats.PublisherCount
subscriberCount += topicPartitionStats.SubscriberCount
- key := tps.TopicPartition.String()
+ key := tps.TopicPartition.TopicPartitionId()
bs.TopicPartitionStats.Set(key, tps)
delete(currentTopicPartitions, key)
}
@@ -79,7 +79,7 @@ func (bs *BrokerStats) RegisterAssignment(t *mq_pb.Topic, partition *mq_pb.Parti
PublisherCount: 0,
SubscriberCount: 0,
}
- key := tps.TopicPartition.String()
+ key := tps.TopicPartition.TopicPartitionId()
if isAdd {
bs.TopicPartitionStats.SetIfAbsent(key, tps)
} else {
diff --git a/weed/mq/schema/schema_builder.go b/weed/mq/schema/schema_builder.go
index db89ce34c..35272af47 100644
--- a/weed/mq/schema/schema_builder.go
+++ b/weed/mq/schema/schema_builder.go
@@ -19,10 +19,12 @@ type RecordTypeBuilder struct {
recordType *schema_pb.RecordType
}
+// RecordTypeBegin creates a new RecordTypeBuilder, it should be followed by a series of WithField methods and RecordTypeEnd
func RecordTypeBegin() *RecordTypeBuilder {
return &RecordTypeBuilder{recordType: &schema_pb.RecordType{}}
}
+// RecordTypeEnd finishes the building of a RecordValue
func (rtb *RecordTypeBuilder) RecordTypeEnd() *schema_pb.RecordType {
// be consistent with parquet.node.go `func (g Group) Fields() []Field`
sort.Slice(rtb.recordType.Fields, func(i, j int) bool {
@@ -31,6 +33,11 @@ func (rtb *RecordTypeBuilder) RecordTypeEnd() *schema_pb.RecordType {
return rtb.recordType
}
+// NewRecordTypeBuilder creates a new RecordTypeBuilder from an existing RecordType, it should be followed by a series of WithField methods and RecordTypeEnd
+func NewRecordTypeBuilder(recordType *schema_pb.RecordType) (rtb *RecordTypeBuilder) {
+ return &RecordTypeBuilder{recordType: recordType}
+}
+
func (rtb *RecordTypeBuilder) WithField(name string, scalarType *schema_pb.Type) *RecordTypeBuilder {
rtb.recordType.Fields = append(rtb.recordType.Fields, &schema_pb.Field{
Name: name,
diff --git a/weed/mq/sub_coordinator/partition_consumer_mapping.go b/weed/mq/sub_coordinator/partition_consumer_mapping.go
index 5d1cf158a..e900e4a33 100644
--- a/weed/mq/sub_coordinator/partition_consumer_mapping.go
+++ b/weed/mq/sub_coordinator/partition_consumer_mapping.go
@@ -11,13 +11,6 @@ type PartitionConsumerMapping struct {
prevMappings []*PartitionSlotToConsumerInstanceList
}
-func NewPartitionConsumerMapping(ringSize int32) *PartitionConsumerMapping {
- newVersion := time.Now().UnixNano()
- return &PartitionConsumerMapping{
- currentMapping: NewPartitionSlotToConsumerInstanceList(ringSize, newVersion),
- }
-}
-
// Balance goal:
// 1. max processing power utilization
// 2. allow one consumer instance to be down unexpectedly
@@ -27,8 +20,7 @@ func (pcm *PartitionConsumerMapping) BalanceToConsumerInstances(partitionSlotToB
if len(partitionSlotToBrokerList.PartitionSlots) == 0 || len(consumerInstances) == 0 {
return
}
- newVersion := time.Now().UnixNano()
- newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, newVersion)
+ newMapping := NewPartitionSlotToConsumerInstanceList(partitionSlotToBrokerList.RingSize, time.Now())
var prevMapping *PartitionSlotToConsumerInstanceList
if len(pcm.prevMappings) > 0 {
prevMapping = pcm.prevMappings[len(pcm.prevMappings)-1]
diff --git a/weed/mq/sub_coordinator/partition_list.go b/weed/mq/sub_coordinator/partition_list.go
index 384c1b875..16bf1ff0c 100644
--- a/weed/mq/sub_coordinator/partition_list.go
+++ b/weed/mq/sub_coordinator/partition_list.go
@@ -1,6 +1,6 @@
package sub_coordinator
-import "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+import "time"
type PartitionSlotToConsumerInstance struct {
RangeStart int32
@@ -17,17 +17,9 @@ type PartitionSlotToConsumerInstanceList struct {
Version int64
}
-func NewPartitionSlotToConsumerInstanceList(ringSize int32, version int64) *PartitionSlotToConsumerInstanceList {
+func NewPartitionSlotToConsumerInstanceList(ringSize int32, version time.Time) *PartitionSlotToConsumerInstanceList {
return &PartitionSlotToConsumerInstanceList{
RingSize: ringSize,
- Version: version,
+ Version: version.UnixNano(),
}
}
-
-func ToPartitions(ringSize int32, slots []*PartitionSlotToConsumerInstance) []*topic.Partition {
- partitions := make([]*topic.Partition, 0, len(slots))
- for _, slot := range slots {
- partitions = append(partitions, topic.NewPartition(slot.RangeStart, slot.RangeStop, ringSize, slot.UnixTimeNs))
- }
- return partitions
-}
diff --git a/weed/mq/topic/local_manager.go b/weed/mq/topic/local_manager.go
index d87eff911..9f273723d 100644
--- a/weed/mq/topic/local_manager.go
+++ b/weed/mq/topic/local_manager.go
@@ -88,7 +88,7 @@ func (manager *LocalTopicManager) CollectStats(duration time.Duration) *mq_pb.Br
Topic: Topic{Namespace: localTopic.Namespace, Name: localTopic.Name},
Partition: localPartition.Partition,
}
- stats.Stats[topicPartition.String()] = &mq_pb.TopicPartitionStats{
+ stats.Stats[topicPartition.TopicPartitionId()] = &mq_pb.TopicPartitionStats{
Topic: &mq_pb.Topic{
Namespace: string(localTopic.Namespace),
Name: localTopic.Name,
diff --git a/weed/mq/topic/local_partition.go b/weed/mq/topic/local_partition.go
index 8911c1841..e32fc2398 100644
--- a/weed/mq/topic/local_partition.go
+++ b/weed/mq/topic/local_partition.go
@@ -34,6 +34,7 @@ type LocalPartition struct {
}
var TIME_FORMAT = "2006-01-02-15-04-05"
+var PartitionGenerationFormat = "v2006-01-02-15-04-05"
func NewLocalPartition(partition Partition, logFlushFn log_buffer.LogFlushFuncType, readFromDiskFn log_buffer.LogReadFromDiskFuncType) *LocalPartition {
lp := &LocalPartition{
diff --git a/weed/mq/topic/partition.go b/weed/mq/topic/partition.go
index 192af6c98..7edf979b5 100644
--- a/weed/mq/topic/partition.go
+++ b/weed/mq/topic/partition.go
@@ -3,6 +3,7 @@ package topic
import (
"fmt"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ "time"
)
const PartitionCount = 4096
@@ -89,6 +90,19 @@ func (partition Partition) String() string {
return fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop)
}
-func ToString(partition *mq_pb.Partition) string {
- return fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop)
+func ParseTopicVersion(name string) (t time.Time, err error) {
+ return time.Parse(PartitionGenerationFormat, name)
+}
+
+func ParsePartitionBoundary(name string) (start, stop int32) {
+ _, err := fmt.Sscanf(name, "%04d-%04d", &start, &stop)
+ if err != nil {
+ return 0, 0
+ }
+ return start, stop
+}
+
+func PartitionDir(t Topic, p Partition) string {
+ partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(PartitionGenerationFormat)
+ return fmt.Sprintf("%s/%s/%04d-%04d", t.Dir(), partitionGeneration, p.RangeStart, p.RangeStop)
}
diff --git a/weed/mq/topic/topic.go b/weed/mq/topic/topic.go
index 6932fcb56..5e9012e70 100644
--- a/weed/mq/topic/topic.go
+++ b/weed/mq/topic/topic.go
@@ -1,8 +1,13 @@
package topic
import (
+ "bytes"
+ "errors"
"fmt"
+ "github.com/seaweedfs/seaweedfs/weed/filer"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
+ jsonpb "google.golang.org/protobuf/encoding/protojson"
)
type Topic struct {
@@ -23,13 +28,42 @@ func FromPbTopic(topic *mq_pb.Topic) Topic {
}
}
-func (tp Topic) ToPbTopic() *mq_pb.Topic {
+func (t Topic) ToPbTopic() *mq_pb.Topic {
return &mq_pb.Topic{
- Namespace: tp.Namespace,
- Name: tp.Name,
+ Namespace: t.Namespace,
+ Name: t.Name,
}
}
-func (tp Topic) String() string {
- return fmt.Sprintf("%s.%s", tp.Namespace, tp.Name)
+func (t Topic) String() string {
+ return fmt.Sprintf("%s.%s", t.Namespace, t.Name)
+}
+
+func (t Topic) Dir() string {
+ return fmt.Sprintf("%s/%s/%s", filer.TopicsDir, t.Namespace, t.Name)
+}
+
+func (t Topic) ReadConfFile(client filer_pb.SeaweedFilerClient) (*mq_pb.ConfigureTopicResponse, error) {
+ data, err := filer.ReadInsideFiler(client, t.Dir(), filer.TopicConfFile)
+ if errors.Is(err, filer_pb.ErrNotFound) {
+ return nil, err
+ }
+ if err != nil {
+ return nil, fmt.Errorf("read topic.conf of %v: %v", t, err)
+ }
+ // parse into filer conf object
+ conf := &mq_pb.ConfigureTopicResponse{}
+ if err = jsonpb.Unmarshal(data, conf); err != nil {
+ return nil, fmt.Errorf("unmarshal topic %v conf: %v", t, err)
+ }
+ return conf, nil
+}
+
+func (t Topic) WriteConfFile(client filer_pb.SeaweedFilerClient, conf *mq_pb.ConfigureTopicResponse) error {
+ var buf bytes.Buffer
+ filer.ProtoToText(&buf, conf)
+ if err := filer.SaveInsideFiler(client, t.Dir(), filer.TopicConfFile, buf.Bytes()); err != nil {
+ return fmt.Errorf("save topic %v conf: %v", t, err)
+ }
+ return nil
}
diff --git a/weed/mq/topic/topic_partition.go b/weed/mq/topic/topic_partition.go
index 20b33a7e4..b14bc9c46 100644
--- a/weed/mq/topic/topic_partition.go
+++ b/weed/mq/topic/topic_partition.go
@@ -7,6 +7,6 @@ type TopicPartition struct {
Partition
}
-func (tp *TopicPartition) String() string {
+func (tp *TopicPartition) TopicPartitionId() string {
return fmt.Sprintf("%v.%v-%04d-%04d", tp.Namespace, tp.Topic, tp.RangeStart, tp.RangeStop)
}
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 73e50cc48..9470afced 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -19,19 +19,15 @@ import (
)
type FilePart struct {
- Reader io.Reader
- FileName string
- FileSize int64
- MimeType string
- ModTime int64 //in seconds
- Replication string
- Collection string
- DataCenter string
- Ttl string
- DiskType string
- Server string //this comes from assign result
- Fid string //this comes from assign result, but customizable
- Fsync bool
+ Reader io.Reader
+ FileName string
+ FileSize int64
+ MimeType string
+ ModTime int64 //in seconds
+ Pref StoragePreference
+ Server string //this comes from assign result
+ Fid string //this comes from assign result, but customizable
+ Fsync bool
}
type SubmitResult struct {
@@ -42,20 +38,29 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
+type StoragePreference struct {
+ Replication string
+ Collection string
+ DataCenter string
+ Ttl string
+ DiskType string
+ MaxMB int
+}
+
type GetMasterFn func(ctx context.Context) pb.ServerAddress
-func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []FilePart, replication string, collection string, dataCenter string, ttl string, diskType string, maxMB int, usePublicUrl bool) ([]SubmitResult, error) {
+func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []*FilePart, pref StoragePreference, usePublicUrl bool) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
}
ar := &VolumeAssignRequest{
Count: uint64(len(files)),
- Replication: replication,
- Collection: collection,
- DataCenter: dataCenter,
- Ttl: ttl,
- DiskType: diskType,
+ Replication: pref.Replication,
+ Collection: pref.Collection,
+ DataCenter: pref.DataCenter,
+ Ttl: pref.Ttl,
+ DiskType: pref.DiskType,
}
ret, err := Assign(masterFn, grpcDialOption, ar)
if err != nil {
@@ -73,12 +78,8 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []F
if usePublicUrl {
file.Server = ret.PublicUrl
}
- file.Replication = replication
- file.Collection = collection
- file.DataCenter = dataCenter
- file.Ttl = ttl
- file.DiskType = diskType
- results[index].Size, err = file.Upload(maxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
+ file.Pref = pref
+ results[index].Size, err = file.Upload(pref.MaxMB, masterFn, usePublicUrl, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -88,8 +89,8 @@ func SubmitFiles(masterFn GetMasterFn, grpcDialOption grpc.DialOption, files []F
return results, nil
}
-func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
- ret = make([]FilePart, len(fullPathFilenames))
+func NewFileParts(fullPathFilenames []string) (ret []*FilePart, err error) {
+ ret = make([]*FilePart, len(fullPathFilenames))
for index, file := range fullPathFilenames {
if ret[index], err = newFilePart(file); err != nil {
return
@@ -97,7 +98,8 @@ func NewFileParts(fullPathFilenames []string) (ret []FilePart, err error) {
}
return
}
-func newFilePart(fullPathFilename string) (ret FilePart, err error) {
+func newFilePart(fullPathFilename string) (ret *FilePart, err error) {
+ ret = &FilePart{}
fh, openErr := os.Open(fullPathFilename)
if openErr != nil {
glog.V(0).Info("Failed to open file: ", fullPathFilename)
@@ -121,7 +123,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
+func (fi *FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -145,13 +147,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw
var ret *AssignResult
var id string
- if fi.DataCenter != "" {
+ if fi.Pref.DataCenter != "" {
ar := &VolumeAssignRequest{
Count: uint64(chunks),
- Replication: fi.Replication,
- Collection: fi.Collection,
- Ttl: fi.Ttl,
- DiskType: fi.DiskType,
+ Replication: fi.Pref.Replication,
+ Collection: fi.Pref.Collection,
+ Ttl: fi.Pref.Ttl,
+ DiskType: fi.Pref.DiskType,
}
ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
@@ -159,13 +161,13 @@ func (fi FilePart) Upload(maxMB int, masterFn GetMasterFn, usePublicUrl bool, jw
}
}
for i := int64(0); i < chunks; i++ {
- if fi.DataCenter == "" {
+ if fi.Pref.DataCenter == "" {
ar := &VolumeAssignRequest{
Count: 1,
- Replication: fi.Replication,
- Collection: fi.Collection,
- Ttl: fi.Ttl,
- DiskType: fi.DiskType,
+ Replication: fi.Pref.Replication,
+ Collection: fi.Pref.Collection,
+ Ttl: fi.Pref.Ttl,
+ DiskType: fi.Pref.DiskType,
}
ret, err = Assign(masterFn, grpcDialOption, ar)
if err != nil {
diff --git a/weed/shell/command_mq_topic_compact.go b/weed/shell/command_mq_topic_compact.go
new file mode 100644
index 000000000..f1dee8662
--- /dev/null
+++ b/weed/shell/command_mq_topic_compact.go
@@ -0,0 +1,94 @@
+package shell
+
+import (
+ "flag"
+ "github.com/seaweedfs/seaweedfs/weed/filer_client"
+ "github.com/seaweedfs/seaweedfs/weed/mq/logstore"
+ "github.com/seaweedfs/seaweedfs/weed/mq/schema"
+ "github.com/seaweedfs/seaweedfs/weed/mq/topic"
+ "github.com/seaweedfs/seaweedfs/weed/operation"
+ "github.com/seaweedfs/seaweedfs/weed/pb"
+ "google.golang.org/grpc"
+ "io"
+ "time"
+)
+
+func init() {
+ Commands = append(Commands, &commandMqTopicCompact{})
+}
+
+type commandMqTopicCompact struct {
+}
+
+func (c *commandMqTopicCompact) Name() string {
+ return "mq.topic.compact"
+}
+
+func (c *commandMqTopicCompact) Help() string {
+ return `compact the topic storage into parquet format
+
+ Example:
+ mq.topic.compact -namespace <namespace> -topic <topic_name> -timeAgo <time_ago>
+
+`
+}
+
+func (c *commandMqTopicCompact) HasTag(tag CommandTag) bool {
+ return ResourceHeavy == tag
+}
+
+func (c *commandMqTopicCompact) Do(args []string, commandEnv *CommandEnv, writer io.Writer) error {
+
+ // parse parameters
+ mqCommand := flag.NewFlagSet(c.Name(), flag.ContinueOnError)
+ namespace := mqCommand.String("namespace", "", "namespace name")
+ topicName := mqCommand.String("topic", "", "topic name")
+ timeAgo := mqCommand.Duration("timeAgo", 2*time.Minute, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ replication := mqCommand.String("replication", "", "replication type")
+ collection := mqCommand.String("collection", "", "optional collection name")
+ dataCenter := mqCommand.String("dataCenter", "", "optional data center name")
+ diskType := mqCommand.String("disk", "", "[hdd|ssd|<tag>] hard drive or solid state drive or any tag")
+ maxMB := mqCommand.Int("maxMB", 4, "split files larger than the limit")
+
+ if err := mqCommand.Parse(args); err != nil {
+ return err
+ }
+
+ storagePreference := &operation.StoragePreference{
+ Replication: *replication,
+ Collection: *collection,
+ DataCenter: *dataCenter,
+ DiskType: *diskType,
+ MaxMB: *maxMB,
+ }
+
+ // read topic configuration
+ fca := &filer_client.FilerClientAccessor{
+ GetFiler: func() pb.ServerAddress {
+ return commandEnv.option.FilerAddress
+ },
+ GetGrpcDialOption: func() grpc.DialOption {
+ return commandEnv.option.GrpcDialOption
+ },
+ }
+ t := topic.NewTopic(*namespace, *topicName)
+ topicConf, err := fca.ReadTopicConfFromFiler(t)
+ if err != nil {
+ return err
+ }
+
+ // get record type
+ recordType := topicConf.GetRecordType()
+ recordType = schema.NewRecordTypeBuilder(recordType).
+ WithField(logstore.SW_COLUMN_NAME_TS, schema.TypeInt64).
+ WithField(logstore.SW_COLUMN_NAME_KEY, schema.TypeBytes).
+ RecordTypeEnd()
+
+ // compact the topic partition versions
+ if err = logstore.CompactTopicPartitions(commandEnv, t, *timeAgo, recordType, storagePreference); err != nil {
+ return err
+ }
+
+ return nil
+
+}
diff --git a/weed/util/chunk_cache/chunk_cache_in_memory.go b/weed/util/chunk_cache/chunk_cache_in_memory.go
index 2982d0979..dd101996e 100644
--- a/weed/util/chunk_cache/chunk_cache_in_memory.go
+++ b/weed/util/chunk_cache/chunk_cache_in_memory.go
@@ -5,11 +5,31 @@ import (
"time"
)
+var (
+ _ ChunkCache = &ChunkCacheInMemory{}
+)
+
// a global cache for recently accessed file chunks
type ChunkCacheInMemory struct {
cache *ccache.Cache
}
+func (c *ChunkCacheInMemory) ReadChunkAt(data []byte, fileId string, offset uint64) (n int, err error) {
+ return c.readChunkAt(data, fileId, offset)
+}
+
+func (c *ChunkCacheInMemory) IsInCache(fileId string, lockNeeded bool) (answer bool) {
+ item := c.cache.Get(fileId)
+ if item == nil {
+ return false
+ }
+ return true
+}
+
+func (c *ChunkCacheInMemory) GetMaxFilePartSizeInCache() (answer uint64) {
+ return 8 * 1024 * 1024
+}
+
func NewChunkCacheInMemory(maxEntries int64) *ChunkCacheInMemory {
pruneCount := maxEntries >> 3
if pruneCount <= 0 {