aboutsummaryrefslogtreecommitdiff
path: root/weed/util
diff options
context:
space:
mode:
Diffstat (limited to 'weed/util')
-rw-r--r--weed/util/log_buffer/log_buffer.go16
-rw-r--r--weed/util/log_buffer/log_read.go102
-rw-r--r--weed/util/sqlutil/splitter.go142
-rw-r--r--weed/util/sqlutil/splitter_test.go147
4 files changed, 407 insertions, 0 deletions
diff --git a/weed/util/log_buffer/log_buffer.go b/weed/util/log_buffer/log_buffer.go
index 8683dfffc..15ea062c6 100644
--- a/weed/util/log_buffer/log_buffer.go
+++ b/weed/util/log_buffer/log_buffer.go
@@ -24,6 +24,7 @@ type dataToFlush struct {
}
type EachLogEntryFuncType func(logEntry *filer_pb.LogEntry) (isDone bool, err error)
+type EachLogEntryWithBatchIndexFuncType func(logEntry *filer_pb.LogEntry, batchIndex int64) (isDone bool, err error)
type LogFlushFuncType func(logBuffer *LogBuffer, startTime, stopTime time.Time, buf []byte)
type LogReadFromDiskFuncType func(startPosition MessagePosition, stopTsNs int64, eachLogEntryFn EachLogEntryFuncType) (lastReadPosition MessagePosition, isDone bool, err error)
@@ -63,6 +64,7 @@ func NewLogBuffer(name string, flushInterval time.Duration, flushFn LogFlushFunc
notifyFn: notifyFn,
flushChan: make(chan *dataToFlush, 256),
isStopping: new(atomic.Bool),
+ batchIndex: time.Now().UnixNano(), // Initialize with creation time for uniqueness across restarts
}
go lb.loopFlush()
go lb.loopInterval()
@@ -343,6 +345,20 @@ func (logBuffer *LogBuffer) ReleaseMemory(b *bytes.Buffer) {
bufferPool.Put(b)
}
+// GetName returns the log buffer name for metadata tracking
+func (logBuffer *LogBuffer) GetName() string {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+ return logBuffer.name
+}
+
+// GetBatchIndex returns the current batch index for metadata tracking
+func (logBuffer *LogBuffer) GetBatchIndex() int64 {
+ logBuffer.RLock()
+ defer logBuffer.RUnlock()
+ return logBuffer.batchIndex
+}
+
var bufferPool = sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
diff --git a/weed/util/log_buffer/log_read.go b/weed/util/log_buffer/log_read.go
index cf83de1e5..0ebcc7cc9 100644
--- a/weed/util/log_buffer/log_read.go
+++ b/weed/util/log_buffer/log_read.go
@@ -130,3 +130,105 @@ func (logBuffer *LogBuffer) LoopProcessLogData(readerName string, startPosition
}
}
+
+// LoopProcessLogDataWithBatchIndex is similar to LoopProcessLogData but provides batchIndex to the callback
+func (logBuffer *LogBuffer) LoopProcessLogDataWithBatchIndex(readerName string, startPosition MessagePosition, stopTsNs int64,
+ waitForDataFn func() bool, eachLogDataFn EachLogEntryWithBatchIndexFuncType) (lastReadPosition MessagePosition, isDone bool, err error) {
+ // loop through all messages
+ var bytesBuf *bytes.Buffer
+ var batchIndex int64
+ lastReadPosition = startPosition
+ var entryCounter int64
+ defer func() {
+ if bytesBuf != nil {
+ logBuffer.ReleaseMemory(bytesBuf)
+ }
+ // println("LoopProcessLogDataWithBatchIndex", readerName, "sent messages total", entryCounter)
+ }()
+
+ for {
+
+ if bytesBuf != nil {
+ logBuffer.ReleaseMemory(bytesBuf)
+ }
+ bytesBuf, batchIndex, err = logBuffer.ReadFromBuffer(lastReadPosition)
+ if err == ResumeFromDiskError {
+ time.Sleep(1127 * time.Millisecond)
+ return lastReadPosition, isDone, ResumeFromDiskError
+ }
+ readSize := 0
+ if bytesBuf != nil {
+ readSize = bytesBuf.Len()
+ }
+ glog.V(4).Infof("%s ReadFromBuffer at %v batch %d. Read bytes %v batch %d", readerName, lastReadPosition, lastReadPosition.BatchIndex, readSize, batchIndex)
+ if bytesBuf == nil {
+ if batchIndex >= 0 {
+ lastReadPosition = NewMessagePosition(lastReadPosition.UnixNano(), batchIndex)
+ }
+ if stopTsNs != 0 {
+ isDone = true
+ return
+ }
+ lastTsNs := logBuffer.LastTsNs.Load()
+
+ for lastTsNs == logBuffer.LastTsNs.Load() {
+ if waitForDataFn() {
+ continue
+ } else {
+ isDone = true
+ return
+ }
+ }
+ if logBuffer.IsStopping() {
+ isDone = true
+ return
+ }
+ continue
+ }
+
+ buf := bytesBuf.Bytes()
+ // fmt.Printf("ReadFromBuffer %s by %v size %d\n", readerName, lastReadPosition, len(buf))
+
+ batchSize := 0
+
+ for pos := 0; pos+4 < len(buf); {
+
+ size := util.BytesToUint32(buf[pos : pos+4])
+ if pos+4+int(size) > len(buf) {
+ err = ResumeError
+ glog.Errorf("LoopProcessLogDataWithBatchIndex: %s read buffer %v read %d entries [%d,%d) from [0,%d)", readerName, lastReadPosition, batchSize, pos, pos+int(size)+4, len(buf))
+ return
+ }
+ entryData := buf[pos+4 : pos+4+int(size)]
+
+ logEntry := &filer_pb.LogEntry{}
+ if err = proto.Unmarshal(entryData, logEntry); err != nil {
+ glog.Errorf("unexpected unmarshal mq_pb.Message: %v", err)
+ pos += 4 + int(size)
+ continue
+ }
+ if stopTsNs != 0 && logEntry.TsNs > stopTsNs {
+ isDone = true
+ // println("stopTsNs", stopTsNs, "logEntry.TsNs", logEntry.TsNs)
+ return
+ }
+ lastReadPosition = NewMessagePosition(logEntry.TsNs, batchIndex)
+
+ if isDone, err = eachLogDataFn(logEntry, batchIndex); err != nil {
+ glog.Errorf("LoopProcessLogDataWithBatchIndex: %s process log entry %d %v: %v", readerName, batchSize+1, logEntry, err)
+ return
+ }
+ if isDone {
+ glog.V(0).Infof("LoopProcessLogDataWithBatchIndex: %s process log entry %d", readerName, batchSize+1)
+ return
+ }
+
+ pos += 4 + int(size)
+ batchSize++
+ entryCounter++
+
+ }
+
+ }
+
+}
diff --git a/weed/util/sqlutil/splitter.go b/weed/util/sqlutil/splitter.go
new file mode 100644
index 000000000..098a7ecb3
--- /dev/null
+++ b/weed/util/sqlutil/splitter.go
@@ -0,0 +1,142 @@
+package sqlutil
+
+import (
+ "strings"
+)
+
+// SplitStatements splits a query string into individual SQL statements.
+// This robust implementation handles SQL comments, quoted strings, and escaped characters.
+//
+// Features:
+// - Handles single-line comments (-- comment)
+// - Handles multi-line comments (/* comment */)
+// - Properly escapes single quotes in strings ('don”t')
+// - Properly escapes double quotes in identifiers ("column""name")
+// - Ignores semicolons within quoted strings and comments
+// - Returns clean, trimmed statements with empty statements filtered out
+func SplitStatements(query string) []string {
+ var statements []string
+ var current strings.Builder
+
+ query = strings.TrimSpace(query)
+ if query == "" {
+ return []string{}
+ }
+
+ runes := []rune(query)
+ i := 0
+
+ for i < len(runes) {
+ char := runes[i]
+
+ // Handle single-line comments (-- comment)
+ if char == '-' && i+1 < len(runes) && runes[i+1] == '-' {
+ // Skip the entire comment without including it in any statement
+ for i < len(runes) && runes[i] != '\n' && runes[i] != '\r' {
+ i++
+ }
+ // Skip the newline if present
+ if i < len(runes) {
+ i++
+ }
+ continue
+ }
+
+ // Handle multi-line comments (/* comment */)
+ if char == '/' && i+1 < len(runes) && runes[i+1] == '*' {
+ // Skip the /* opening
+ i++
+ i++
+
+ // Skip to end of comment or end of input without including content
+ for i < len(runes) {
+ if runes[i] == '*' && i+1 < len(runes) && runes[i+1] == '/' {
+ i++ // Skip the *
+ i++ // Skip the /
+ break
+ }
+ i++
+ }
+ continue
+ }
+
+ // Handle single-quoted strings
+ if char == '\'' {
+ current.WriteRune(char)
+ i++
+
+ for i < len(runes) {
+ char = runes[i]
+ current.WriteRune(char)
+
+ if char == '\'' {
+ // Check if it's an escaped quote
+ if i+1 < len(runes) && runes[i+1] == '\'' {
+ i++ // Skip the next quote (it's escaped)
+ if i < len(runes) {
+ current.WriteRune(runes[i])
+ }
+ } else {
+ break // End of string
+ }
+ }
+ i++
+ }
+ i++
+ continue
+ }
+
+ // Handle double-quoted identifiers
+ if char == '"' {
+ current.WriteRune(char)
+ i++
+
+ for i < len(runes) {
+ char = runes[i]
+ current.WriteRune(char)
+
+ if char == '"' {
+ // Check if it's an escaped quote
+ if i+1 < len(runes) && runes[i+1] == '"' {
+ i++ // Skip the next quote (it's escaped)
+ if i < len(runes) {
+ current.WriteRune(runes[i])
+ }
+ } else {
+ break // End of identifier
+ }
+ }
+ i++
+ }
+ i++
+ continue
+ }
+
+ // Handle semicolon (statement separator)
+ if char == ';' {
+ stmt := strings.TrimSpace(current.String())
+ if stmt != "" {
+ statements = append(statements, stmt)
+ }
+ current.Reset()
+ } else {
+ current.WriteRune(char)
+ }
+ i++
+ }
+
+ // Add any remaining statement
+ if current.Len() > 0 {
+ stmt := strings.TrimSpace(current.String())
+ if stmt != "" {
+ statements = append(statements, stmt)
+ }
+ }
+
+ // If no statements found, return the original query as a single statement
+ if len(statements) == 0 {
+ return []string{strings.TrimSpace(strings.TrimSuffix(strings.TrimSpace(query), ";"))}
+ }
+
+ return statements
+}
diff --git a/weed/util/sqlutil/splitter_test.go b/weed/util/sqlutil/splitter_test.go
new file mode 100644
index 000000000..91fac6196
--- /dev/null
+++ b/weed/util/sqlutil/splitter_test.go
@@ -0,0 +1,147 @@
+package sqlutil
+
+import (
+ "reflect"
+ "testing"
+)
+
+func TestSplitStatements(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ expected []string
+ }{
+ {
+ name: "Simple single statement",
+ input: "SELECT * FROM users",
+ expected: []string{"SELECT * FROM users"},
+ },
+ {
+ name: "Multiple statements",
+ input: "SELECT * FROM users; SELECT * FROM orders;",
+ expected: []string{"SELECT * FROM users", "SELECT * FROM orders"},
+ },
+ {
+ name: "Semicolon in single quotes",
+ input: "SELECT 'hello;world' FROM users; SELECT * FROM orders;",
+ expected: []string{"SELECT 'hello;world' FROM users", "SELECT * FROM orders"},
+ },
+ {
+ name: "Semicolon in double quotes",
+ input: `SELECT "column;name" FROM users; SELECT * FROM orders;`,
+ expected: []string{`SELECT "column;name" FROM users`, "SELECT * FROM orders"},
+ },
+ {
+ name: "Escaped quotes in strings",
+ input: `SELECT 'don''t split; here' FROM users; SELECT * FROM orders;`,
+ expected: []string{`SELECT 'don''t split; here' FROM users`, "SELECT * FROM orders"},
+ },
+ {
+ name: "Escaped quotes in identifiers",
+ input: `SELECT "column""name" FROM users; SELECT * FROM orders;`,
+ expected: []string{`SELECT "column""name" FROM users`, "SELECT * FROM orders"},
+ },
+ {
+ name: "Single line comment",
+ input: "SELECT * FROM users; -- This is a comment\nSELECT * FROM orders;",
+ expected: []string{"SELECT * FROM users", "SELECT * FROM orders"},
+ },
+ {
+ name: "Single line comment with semicolon",
+ input: "SELECT * FROM users; -- Comment with; semicolon\nSELECT * FROM orders;",
+ expected: []string{"SELECT * FROM users", "SELECT * FROM orders"},
+ },
+ {
+ name: "Multi-line comment",
+ input: "SELECT * FROM users; /* Multi-line\ncomment */ SELECT * FROM orders;",
+ expected: []string{"SELECT * FROM users", "SELECT * FROM orders"},
+ },
+ {
+ name: "Multi-line comment with semicolon",
+ input: "SELECT * FROM users; /* Comment with; semicolon */ SELECT * FROM orders;",
+ expected: []string{"SELECT * FROM users", "SELECT * FROM orders"},
+ },
+ {
+ name: "Complex mixed case",
+ input: `SELECT 'test;string', "quoted;id" FROM users; -- Comment; here
+ /* Another; comment */
+ INSERT INTO users VALUES ('name''s value', "id""field");`,
+ expected: []string{
+ `SELECT 'test;string', "quoted;id" FROM users`,
+ `INSERT INTO users VALUES ('name''s value', "id""field")`,
+ },
+ },
+ {
+ name: "Empty statements filtered",
+ input: "SELECT * FROM users;;; SELECT * FROM orders;",
+ expected: []string{"SELECT * FROM users", "SELECT * FROM orders"},
+ },
+ {
+ name: "Whitespace handling",
+ input: " SELECT * FROM users ; SELECT * FROM orders ; ",
+ expected: []string{"SELECT * FROM users", "SELECT * FROM orders"},
+ },
+ {
+ name: "Single statement without semicolon",
+ input: "SELECT * FROM users",
+ expected: []string{"SELECT * FROM users"},
+ },
+ {
+ name: "Empty query",
+ input: "",
+ expected: []string{},
+ },
+ {
+ name: "Only whitespace",
+ input: " \n\t ",
+ expected: []string{},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := SplitStatements(tt.input)
+ if !reflect.DeepEqual(result, tt.expected) {
+ t.Errorf("SplitStatements() = %v, expected %v", result, tt.expected)
+ }
+ })
+ }
+}
+
+func TestSplitStatements_EdgeCases(t *testing.T) {
+ tests := []struct {
+ name string
+ input string
+ expected []string
+ }{
+ {
+ name: "Nested comments are not supported but handled gracefully",
+ input: "SELECT * FROM users; /* Outer /* inner */ comment */ SELECT * FROM orders;",
+ expected: []string{"SELECT * FROM users", "comment */ SELECT * FROM orders"},
+ },
+ {
+ name: "Unterminated string (malformed SQL)",
+ input: "SELECT 'unterminated string; SELECT * FROM orders;",
+ expected: []string{"SELECT 'unterminated string; SELECT * FROM orders;"},
+ },
+ {
+ name: "Unterminated comment (malformed SQL)",
+ input: "SELECT * FROM users; /* unterminated comment",
+ expected: []string{"SELECT * FROM users"},
+ },
+ {
+ name: "Multiple semicolons in quotes",
+ input: "SELECT ';;;' FROM users; SELECT ';;;' FROM orders;",
+ expected: []string{"SELECT ';;;' FROM users", "SELECT ';;;' FROM orders"},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := SplitStatements(tt.input)
+ if !reflect.DeepEqual(result, tt.expected) {
+ t.Errorf("SplitStatements() = %v, expected %v", result, tt.expected)
+ }
+ })
+ }
+}