aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorchrislu <chris.lu@gmail.com>2024-03-17 11:46:55 -0700
committerchrislu <chris.lu@gmail.com>2024-03-17 11:46:55 -0700
commita375b2815e8d8be6b5a7c11318a0d1045ae6e135 (patch)
tree20a4ddaac6f043e0e625546bbc2d01e092b5e35f
parent8e5068fd2fe6e4cc8af23d5a8a50101a657af0c5 (diff)
parentca042bd067a745f5f2eda3943a98e64564f27f6a (diff)
downloadseaweedfs-a375b2815e8d8be6b5a7c11318a0d1045ae6e135.tar.xz
seaweedfs-a375b2815e8d8be6b5a7c11318a0d1045ae6e135.zip
Merge branch 'master' into mq-subscribe
-rw-r--r--weed/mq/broker/broker_test.go47
-rw-r--r--weed/mq/pub_balancer/allocate_test.go1
-rw-r--r--weed/shell/command_volume_balance_test.go2
-rw-r--r--weed/shell/command_volume_tier_move.go4
-rw-r--r--weed/util/log_buffer/log_buffer_test.go10
5 files changed, 8 insertions, 56 deletions
diff --git a/weed/mq/broker/broker_test.go b/weed/mq/broker/broker_test.go
deleted file mode 100644
index 3d0b0a19c..000000000
--- a/weed/mq/broker/broker_test.go
+++ /dev/null
@@ -1,47 +0,0 @@
-package broker
-
-import (
- "context"
- "fmt"
- "net"
- "testing"
- "time"
-
- "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"
- "google.golang.org/grpc"
- "google.golang.org/grpc/test/bufconn"
-)
-
-var lis *bufconn.Listener
-
-func init() {
- lis = bufconn.Listen(1024 * 1024)
- server := grpc.NewServer()
- mq_pb.RegisterSeaweedMessagingServer(server, &MessageQueueBroker{})
- go func() {
- if err := server.Serve(lis); err != nil {
- fmt.Printf("Server exited with error: %v", err)
- }
- }()
-}
-
-func bufDialer(string, time.Duration) (net.Conn, error) {
- return lis.Dial()
-}
-
-func TestMessageQueueBroker_ListTopics(t *testing.T) {
- conn, err := grpc.DialContext(context.Background(), "bufnet", grpc.WithDialer(bufDialer), grpc.WithInsecure())
- if err != nil {
- t.Fatalf("Failed to dial bufnet: %v", err)
- }
- defer conn.Close()
-
- client := mq_pb.NewSeaweedMessagingClient(conn)
- request := &mq_pb.ListTopicsRequest{}
-
- _, err = client.ListTopics(context.Background(), request)
- if err == nil {
- t.Fatalf("Add failed: %v", err)
- }
-
-}
diff --git a/weed/mq/pub_balancer/allocate_test.go b/weed/mq/pub_balancer/allocate_test.go
index 89a6bb23c..3f1aa4fbf 100644
--- a/weed/mq/pub_balancer/allocate_test.go
+++ b/weed/mq/pub_balancer/allocate_test.go
@@ -61,7 +61,6 @@ func testThem(t *testing.T, tests []struct {
assert.Equal(t, tt.wantAssignments[i].Partition.RangeStart, gotAssignment.Partition.RangeStart)
assert.Equal(t, tt.wantAssignments[i].Partition.RangeStop, gotAssignment.Partition.RangeStop)
assert.Equal(t, tt.wantAssignments[i].Partition.RingSize, gotAssignment.Partition.RingSize)
- assert.Equal(t, tt.wantAssignments[i].Partition.UnixTimeNs, gotAssignment.Partition.UnixTimeNs)
}
})
}
diff --git a/weed/shell/command_volume_balance_test.go b/weed/shell/command_volume_balance_test.go
index ce0aeb5ab..d533269a4 100644
--- a/weed/shell/command_volume_balance_test.go
+++ b/weed/shell/command_volume_balance_test.go
@@ -264,7 +264,7 @@ func TestBalance(t *testing.T) {
func TestVolumeSelection(t *testing.T) {
topologyInfo := parseOutput(topoData)
- vids, err := collectVolumeIdsForTierChange(nil, topologyInfo, 1000, types.ToDiskType("hdd"), "", 20.0, 0)
+ vids, err := collectVolumeIdsForTierChange(topologyInfo, 1000, types.ToDiskType("hdd"), "", 20.0, 0)
if err != nil {
t.Errorf("collectVolumeIdsForTierChange: %v", err)
}
diff --git a/weed/shell/command_volume_tier_move.go b/weed/shell/command_volume_tier_move.go
index bf41c2ea0..e6cf4ee02 100644
--- a/weed/shell/command_volume_tier_move.go
+++ b/weed/shell/command_volume_tier_move.go
@@ -88,7 +88,7 @@ func (c *commandVolumeTierMove) Do(args []string, commandEnv *CommandEnv, writer
}
// collect all volumes that should change
- volumeIds, err := collectVolumeIdsForTierChange(commandEnv, topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod)
+ volumeIds, err := collectVolumeIdsForTierChange(topologyInfo, volumeSizeLimitMb, fromDiskType, *collectionPattern, *fullPercentage, *quietPeriod)
if err != nil {
return err
}
@@ -279,7 +279,7 @@ func (c *commandVolumeTierMove) doMoveOneVolume(commandEnv *CommandEnv, writer i
return nil
}
-func collectVolumeIdsForTierChange(commandEnv *CommandEnv, topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
+func collectVolumeIdsForTierChange(topologyInfo *master_pb.TopologyInfo, volumeSizeLimitMb uint64, sourceTier types.DiskType, collectionPattern string, fullPercentage float64, quietPeriod time.Duration) (vids []needle.VolumeId, err error) {
quietSeconds := int64(quietPeriod / time.Second)
nowUnixSeconds := time.Now().Unix()
diff --git a/weed/util/log_buffer/log_buffer_test.go b/weed/util/log_buffer/log_buffer_test.go
index ac46a096c..84279f625 100644
--- a/weed/util/log_buffer/log_buffer_test.go
+++ b/weed/util/log_buffer/log_buffer_test.go
@@ -13,12 +13,12 @@ import (
func TestNewLogBufferFirstBuffer(t *testing.T) {
flushInterval := time.Second
- lb := NewLogBuffer("test", flushInterval, func(startTime, stopTime time.Time, buf []byte) {
+ lb := NewLogBuffer("test", flushInterval, func(logBuffer *LogBuffer, startTime time.Time, stopTime time.Time, buf []byte) {
fmt.Printf("flush from %v to %v %d bytes\n", startTime, stopTime, len(buf))
}, nil, func() {
})
- startTime := time.Now()
+ startTime := MessagePosition{Time:time.Now()}
messageSize := 1024
messageCount := 5000
@@ -31,13 +31,13 @@ func TestNewLogBufferFirstBuffer(t *testing.T) {
lastProcessedTime, isDone, err := lb.LoopProcessLogData("test", startTime, 0, func() bool {
// stop if no more messages
return receivedMessageCount < messageCount
- }, func(logEntry *filer_pb.LogEntry) error {
+ }, func(logEntry *filer_pb.LogEntry) (isDone bool, err error) {
receivedMessageCount++
if receivedMessageCount >= messageCount {
println("processed all messages")
- return io.EOF
+ return true, io.EOF
}
- return nil
+ return false,nil
})
fmt.Printf("before flush: sent %d received %d\n", messageCount, receivedMessageCount)