aboutsummaryrefslogtreecommitdiff
path: root/weed/mq/kafka/protocol/metrics.go
diff options
context:
space:
mode:
Diffstat (limited to 'weed/mq/kafka/protocol/metrics.go')
-rw-r--r--weed/mq/kafka/protocol/metrics.go233
1 files changed, 233 insertions, 0 deletions
diff --git a/weed/mq/kafka/protocol/metrics.go b/weed/mq/kafka/protocol/metrics.go
new file mode 100644
index 000000000..b4bcd98dd
--- /dev/null
+++ b/weed/mq/kafka/protocol/metrics.go
@@ -0,0 +1,233 @@
+package protocol
+
+import (
+ "sync"
+ "sync/atomic"
+ "time"
+)
+
+// Metrics tracks basic request/error/latency statistics for Kafka protocol operations
+type Metrics struct {
+ // Request counters by API key
+ requestCounts map[uint16]*int64
+ errorCounts map[uint16]*int64
+
+ // Latency tracking
+ latencySum map[uint16]*int64 // Total latency in microseconds
+ latencyCount map[uint16]*int64 // Number of requests for average calculation
+
+ // Connection metrics
+ activeConnections int64
+ totalConnections int64
+
+ // Mutex for map operations
+ mu sync.RWMutex
+
+ // Start time for uptime calculation
+ startTime time.Time
+}
+
+// APIMetrics represents metrics for a specific API
+type APIMetrics struct {
+ APIKey uint16 `json:"api_key"`
+ APIName string `json:"api_name"`
+ RequestCount int64 `json:"request_count"`
+ ErrorCount int64 `json:"error_count"`
+ AvgLatencyMs float64 `json:"avg_latency_ms"`
+}
+
+// ConnectionMetrics represents connection-related metrics
+type ConnectionMetrics struct {
+ ActiveConnections int64 `json:"active_connections"`
+ TotalConnections int64 `json:"total_connections"`
+ UptimeSeconds int64 `json:"uptime_seconds"`
+ StartTime time.Time `json:"start_time"`
+}
+
+// MetricsSnapshot represents a complete metrics snapshot
+type MetricsSnapshot struct {
+ APIs []APIMetrics `json:"apis"`
+ Connections ConnectionMetrics `json:"connections"`
+ Timestamp time.Time `json:"timestamp"`
+}
+
+// NewMetrics creates a new metrics tracker
+func NewMetrics() *Metrics {
+ return &Metrics{
+ requestCounts: make(map[uint16]*int64),
+ errorCounts: make(map[uint16]*int64),
+ latencySum: make(map[uint16]*int64),
+ latencyCount: make(map[uint16]*int64),
+ startTime: time.Now(),
+ }
+}
+
+// RecordRequest records a successful request with latency
+func (m *Metrics) RecordRequest(apiKey uint16, latency time.Duration) {
+ m.ensureCounters(apiKey)
+
+ atomic.AddInt64(m.requestCounts[apiKey], 1)
+ atomic.AddInt64(m.latencySum[apiKey], latency.Microseconds())
+ atomic.AddInt64(m.latencyCount[apiKey], 1)
+}
+
+// RecordError records an error for a specific API
+func (m *Metrics) RecordError(apiKey uint16, latency time.Duration) {
+ m.ensureCounters(apiKey)
+
+ atomic.AddInt64(m.requestCounts[apiKey], 1)
+ atomic.AddInt64(m.errorCounts[apiKey], 1)
+ atomic.AddInt64(m.latencySum[apiKey], latency.Microseconds())
+ atomic.AddInt64(m.latencyCount[apiKey], 1)
+}
+
+// RecordConnection records a new connection
+func (m *Metrics) RecordConnection() {
+ atomic.AddInt64(&m.activeConnections, 1)
+ atomic.AddInt64(&m.totalConnections, 1)
+}
+
+// RecordDisconnection records a connection closure
+func (m *Metrics) RecordDisconnection() {
+ atomic.AddInt64(&m.activeConnections, -1)
+}
+
+// GetSnapshot returns a complete metrics snapshot
+func (m *Metrics) GetSnapshot() MetricsSnapshot {
+ m.mu.RLock()
+ defer m.mu.RUnlock()
+
+ apis := make([]APIMetrics, 0, len(m.requestCounts))
+
+ for apiKey, requestCount := range m.requestCounts {
+ requests := atomic.LoadInt64(requestCount)
+ errors := atomic.LoadInt64(m.errorCounts[apiKey])
+ latencySum := atomic.LoadInt64(m.latencySum[apiKey])
+ latencyCount := atomic.LoadInt64(m.latencyCount[apiKey])
+
+ var avgLatencyMs float64
+ if latencyCount > 0 {
+ avgLatencyMs = float64(latencySum) / float64(latencyCount) / 1000.0 // Convert to milliseconds
+ }
+
+ apis = append(apis, APIMetrics{
+ APIKey: apiKey,
+ APIName: getAPIName(APIKey(apiKey)),
+ RequestCount: requests,
+ ErrorCount: errors,
+ AvgLatencyMs: avgLatencyMs,
+ })
+ }
+
+ return MetricsSnapshot{
+ APIs: apis,
+ Connections: ConnectionMetrics{
+ ActiveConnections: atomic.LoadInt64(&m.activeConnections),
+ TotalConnections: atomic.LoadInt64(&m.totalConnections),
+ UptimeSeconds: int64(time.Since(m.startTime).Seconds()),
+ StartTime: m.startTime,
+ },
+ Timestamp: time.Now(),
+ }
+}
+
+// GetAPIMetrics returns metrics for a specific API
+func (m *Metrics) GetAPIMetrics(apiKey uint16) APIMetrics {
+ m.ensureCounters(apiKey)
+
+ requests := atomic.LoadInt64(m.requestCounts[apiKey])
+ errors := atomic.LoadInt64(m.errorCounts[apiKey])
+ latencySum := atomic.LoadInt64(m.latencySum[apiKey])
+ latencyCount := atomic.LoadInt64(m.latencyCount[apiKey])
+
+ var avgLatencyMs float64
+ if latencyCount > 0 {
+ avgLatencyMs = float64(latencySum) / float64(latencyCount) / 1000.0
+ }
+
+ return APIMetrics{
+ APIKey: apiKey,
+ APIName: getAPIName(APIKey(apiKey)),
+ RequestCount: requests,
+ ErrorCount: errors,
+ AvgLatencyMs: avgLatencyMs,
+ }
+}
+
+// GetConnectionMetrics returns connection-related metrics
+func (m *Metrics) GetConnectionMetrics() ConnectionMetrics {
+ return ConnectionMetrics{
+ ActiveConnections: atomic.LoadInt64(&m.activeConnections),
+ TotalConnections: atomic.LoadInt64(&m.totalConnections),
+ UptimeSeconds: int64(time.Since(m.startTime).Seconds()),
+ StartTime: m.startTime,
+ }
+}
+
+// Reset resets all metrics (useful for testing)
+func (m *Metrics) Reset() {
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ for apiKey := range m.requestCounts {
+ atomic.StoreInt64(m.requestCounts[apiKey], 0)
+ atomic.StoreInt64(m.errorCounts[apiKey], 0)
+ atomic.StoreInt64(m.latencySum[apiKey], 0)
+ atomic.StoreInt64(m.latencyCount[apiKey], 0)
+ }
+
+ atomic.StoreInt64(&m.activeConnections, 0)
+ atomic.StoreInt64(&m.totalConnections, 0)
+ m.startTime = time.Now()
+}
+
+// ensureCounters ensures that counters exist for the given API key
+func (m *Metrics) ensureCounters(apiKey uint16) {
+ m.mu.RLock()
+ if _, exists := m.requestCounts[apiKey]; exists {
+ m.mu.RUnlock()
+ return
+ }
+ m.mu.RUnlock()
+
+ m.mu.Lock()
+ defer m.mu.Unlock()
+
+ // Double-check after acquiring write lock
+ if _, exists := m.requestCounts[apiKey]; exists {
+ return
+ }
+
+ m.requestCounts[apiKey] = new(int64)
+ m.errorCounts[apiKey] = new(int64)
+ m.latencySum[apiKey] = new(int64)
+ m.latencyCount[apiKey] = new(int64)
+}
+
+// Global metrics instance
+var globalMetrics = NewMetrics()
+
+// GetGlobalMetrics returns the global metrics instance
+func GetGlobalMetrics() *Metrics {
+ return globalMetrics
+}
+
+// RecordRequestMetrics is a convenience function to record request metrics globally
+func RecordRequestMetrics(apiKey uint16, latency time.Duration) {
+ globalMetrics.RecordRequest(apiKey, latency)
+}
+
+// RecordErrorMetrics is a convenience function to record error metrics globally
+func RecordErrorMetrics(apiKey uint16, latency time.Duration) {
+ globalMetrics.RecordError(apiKey, latency)
+}
+
+// RecordConnectionMetrics is a convenience function to record connection metrics globally
+func RecordConnectionMetrics() {
+ globalMetrics.RecordConnection()
+}
+
+// RecordDisconnectionMetrics is a convenience function to record disconnection metrics globally
+func RecordDisconnectionMetrics() {
+ globalMetrics.RecordDisconnection()
+}