aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/topic/partition.go
blob: ba1accce10ee98ff6d50bca33c452435e197e02f (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
package topic

import "github.com/seaweedfs/seaweedfs/weed/pb/mq_pb"

const PartitionCount = 4096

type Partition struct {
	RangeStart int32
	RangeStop  int32 // exclusive
	RingSize   int32
	UnixTimeNs int64 // in nanoseconds
}

func NewPartition(rangeStart, rangeStop, ringSize int32, unixTimeNs int64) *Partition {
	return &Partition{
		RangeStart: rangeStart,
		RangeStop:  rangeStop,
		RingSize:   ringSize,
		UnixTimeNs: unixTimeNs,
	}
}

func (partition Partition) Equals(other Partition) bool {
	if partition.RangeStart != other.RangeStart {
		return false
	}
	if partition.RangeStop != other.RangeStop {
		return false
	}
	if partition.RingSize != other.RingSize {
		return false
	}
	if partition.UnixTimeNs != other.UnixTimeNs {
		return false
	}
	return true
}

func FromPbPartition(partition *mq_pb.Partition) Partition {
	return Partition{
		RangeStart: partition.RangeStart,
		RangeStop:  partition.RangeStop,
		RingSize:   partition.RingSize,
		UnixTimeNs: partition.UnixTimeNs,
	}
}

func SplitPartitions(targetCount int32, ts int64) []*Partition {
	partitions := make([]*Partition, 0, targetCount)
	partitionSize := PartitionCount / targetCount
	for i := int32(0); i < targetCount; i++ {
		partitionStop := (i + 1) * partitionSize
		if i == targetCount-1 {
			partitionStop = PartitionCount
		}
		partitions = append(partitions, &Partition{
			RangeStart: i * partitionSize,
			RangeStop:  partitionStop,
			RingSize:   PartitionCount,
			UnixTimeNs: ts,
		})
	}
	return partitions
}

func (partition Partition) ToPbPartition() *mq_pb.Partition {
	return &mq_pb.Partition{
		RangeStart: partition.RangeStart,
		RangeStop:  partition.RangeStop,
		RingSize:   partition.RingSize,
		UnixTimeNs: partition.UnixTimeNs,
	}
}

func (partition Partition) Overlaps(partition2 Partition) bool {
	if partition.RangeStart >= partition2.RangeStop {
		return false
	}
	if partition.RangeStop <= partition2.RangeStart {
		return false
	}
	return true
}