aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/partition.go
blob: 658ec85c471f6a002845b61a2f6a470c74b8a71b (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
package topic

import (
	"fmt"
	"time"

	"github.com/seaweedfs/seaweedfs/weed/pb/schema_pb"
)

const PartitionCount = 4096

type Partition struct {
	RangeStart int32
	RangeStop  int32 // exclusive
	RingSize   int32
	UnixTimeNs int64 // in nanoseconds
}

func NewPartition(rangeStart, rangeStop, ringSize int32, unixTimeNs int64) *Partition {
	return &Partition{
		RangeStart: rangeStart,
		RangeStop:  rangeStop,
		RingSize:   ringSize,
		UnixTimeNs: unixTimeNs,
	}
}

func (partition Partition) Equals(other Partition) bool {
	if partition.RangeStart != other.RangeStart {
		return false
	}
	if partition.RangeStop != other.RangeStop {
		return false
	}
	if partition.RingSize != other.RingSize {
		return false
	}
	if partition.UnixTimeNs != other.UnixTimeNs {
		return false
	}
	return true
}

// LogicalEquals compares only the partition boundaries (RangeStart, RangeStop)
// This is useful when comparing partitions that may have different timestamps or ring sizes
// but represent the same logical partition range
func (partition Partition) LogicalEquals(other Partition) bool {
	return partition.RangeStart == other.RangeStart && partition.RangeStop == other.RangeStop
}

func FromPbPartition(partition *schema_pb.Partition) Partition {
	return Partition{
		RangeStart: partition.RangeStart,
		RangeStop:  partition.RangeStop,
		RingSize:   partition.RingSize,
		UnixTimeNs: partition.UnixTimeNs,
	}
}

func SplitPartitions(targetCount int32, ts int64) []*Partition {
	partitions := make([]*Partition, 0, targetCount)
	partitionSize := PartitionCount / targetCount
	for i := int32(0); i < targetCount; i++ {
		partitionStop := (i + 1) * partitionSize
		if i == targetCount-1 {
			partitionStop = PartitionCount
		}
		partitions = append(partitions, &Partition{
			RangeStart: i * partitionSize,
			RangeStop:  partitionStop,
			RingSize:   PartitionCount,
			UnixTimeNs: ts,
		})
	}
	return partitions
}

func (partition Partition) ToPbPartition() *schema_pb.Partition {
	return &schema_pb.Partition{
		RangeStart: partition.RangeStart,
		RangeStop:  partition.RangeStop,
		RingSize:   partition.RingSize,
		UnixTimeNs: partition.UnixTimeNs,
	}
}

func (partition Partition) Overlaps(partition2 Partition) bool {
	if partition.RangeStart >= partition2.RangeStop {
		return false
	}
	if partition.RangeStop <= partition2.RangeStart {
		return false
	}
	return true
}

func (partition Partition) String() string {
	return fmt.Sprintf("%04d-%04d", partition.RangeStart, partition.RangeStop)
}

func ParseTopicVersion(name string) (t time.Time, err error) {
	return time.Parse(PartitionGenerationFormat, name)
}

func ParsePartitionBoundary(name string) (start, stop int32) {
	_, err := fmt.Sscanf(name, "%04d-%04d", &start, &stop)
	if err != nil {
		return 0, 0
	}
	return start, stop
}

func PartitionDir(t Topic, p Partition) string {
	partitionGeneration := time.Unix(0, p.UnixTimeNs).UTC().Format(PartitionGenerationFormat)
	return fmt.Sprintf("%s/%s/%04d-%04d", t.Dir(), partitionGeneration, p.RangeStart, p.RangeStop)
}