aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2020-04-29 17:40:08 -0700
committerChris Lu <chris.lu@gmail.com>2020-04-29 17:40:08 -0700
commit9e72e9e4b8d58e0f6f99088f0e449d1c9c55e562 (patch)
tree4b3b24e0bc17b9597010ce18892a5cb8264476d7
parented3cf811f576e2dd9fd1c1fb0df967d7fb9e6f1c (diff)
downloadseaweedfs-9e72e9e4b8d58e0f6f99088f0e449d1c9c55e562.tar.xz
seaweedfs-9e72e9e4b8d58e0f6f99088f0e449d1c9c55e562.zip
able to subscribe any topic from any point of time
-rw-r--r--weed/filer2/filer_notify.go4
-rw-r--r--weed/filer2/reader_at.go54
-rw-r--r--weed/filer2/stream.go12
-rw-r--r--weed/filesys/dir.go3
-rw-r--r--weed/messaging/broker/broker_grpc_server_subscribe.go45
-rw-r--r--weed/pb/filer_pb/filer_client.go16
-rw-r--r--weed/pb/filer_pb/filer_client_bfs.go3
-rw-r--r--weed/s3api/filer_util.go3
-rw-r--r--weed/server/webdav_server.go3
-rw-r--r--weed/shell/command_bucket_list.go3
-rw-r--r--weed/shell/command_fs_du.go3
-rw-r--r--weed/shell/command_fs_ls.go5
-rw-r--r--weed/shell/command_fs_tree.go6
13 files changed, 117 insertions, 43 deletions
diff --git a/weed/filer2/filer_notify.go b/weed/filer2/filer_notify.go
index 6b83d8e63..28ade51cc 100644
--- a/weed/filer2/filer_notify.go
+++ b/weed/filer2/filer_notify.go
@@ -109,7 +109,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
}
// println("processing", hourMinuteEntry.FullPath)
chunkedFileReader := NewChunkStreamReaderFromFiler(f.MasterClient, hourMinuteEntry.Chunks)
- if err := readEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
+ if err := ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
chunkedFileReader.Close()
if err == io.EOF {
break
@@ -123,7 +123,7 @@ func (f *Filer) ReadPersistedLogBuffer(startTime time.Time, eachLogEntryFn func(
return nil
}
-func readEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error {
+func ReadEachLogEntry(r io.Reader, sizeBuf []byte, ns int64, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) error {
for {
n, err := r.Read(sizeBuf)
if err != nil {
diff --git a/weed/filer2/reader_at.go b/weed/filer2/reader_at.go
index f56ef6388..a9772da5b 100644
--- a/weed/filer2/reader_at.go
+++ b/weed/filer2/reader_at.go
@@ -27,34 +27,40 @@ type ChunkReadAt struct {
// var _ = io.ReaderAt(&ChunkReadAt{})
+type LookupFileIdFunctionType func(fileId string) (targetUrl string, err error)
+
+func LookupFn(filerClient filer_pb.FilerClient) LookupFileIdFunctionType {
+ return func(fileId string) (targetUrl string, err error) {
+ err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
+ vid := VolumeId(fileId)
+ resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
+ VolumeIds: []string{vid},
+ })
+ if err != nil {
+ return err
+ }
+
+ locations := resp.LocationsMap[vid]
+ if locations == nil || len(locations.Locations) == 0 {
+ glog.V(0).Infof("failed to locate %s", fileId)
+ return fmt.Errorf("failed to locate %s", fileId)
+ }
+
+ volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
+
+ targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
+
+ return nil
+ })
+ return
+ }
+}
+
func NewChunkReaderAtFromClient(filerClient filer_pb.FilerClient, chunkViews []*ChunkView, chunkCache *chunk_cache.ChunkCache) *ChunkReadAt {
return &ChunkReadAt{
chunkViews: chunkViews,
- lookupFileId: func(fileId string) (targetUrl string, err error) {
- err = filerClient.WithFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- vid := VolumeId(fileId)
- resp, err := client.LookupVolume(context.Background(), &filer_pb.LookupVolumeRequest{
- VolumeIds: []string{vid},
- })
- if err != nil {
- return err
- }
-
- locations := resp.LocationsMap[vid]
- if locations == nil || len(locations.Locations) == 0 {
- glog.V(0).Infof("failed to locate %s", fileId)
- return fmt.Errorf("failed to locate %s", fileId)
- }
-
- volumeServerAddress := filerClient.AdjustedUrl(locations.Locations[0].Url)
-
- targetUrl = fmt.Sprintf("http://%s/%s", volumeServerAddress, fileId)
-
- return nil
- })
- return
- },
+ lookupFileId: LookupFn(filerClient),
bufferOffset: -1,
chunkCache: chunkCache,
}
diff --git a/weed/filer2/stream.go b/weed/filer2/stream.go
index 4e785fade..4c8213b07 100644
--- a/weed/filer2/stream.go
+++ b/weed/filer2/stream.go
@@ -81,7 +81,7 @@ type ChunkStreamReader struct {
bufferOffset int64
bufferPos int
chunkIndex int
- lookupFileId func(fileId string) (targetUrl string, err error)
+ lookupFileId LookupFileIdFunctionType
}
var _ = io.ReadSeeker(&ChunkStreamReader{})
@@ -98,6 +98,16 @@ func NewChunkStreamReaderFromFiler(masterClient *wdclient.MasterClient, chunks [
}
}
+func NewChunkStreamReader(filerClient filer_pb.FilerClient, chunks []*filer_pb.FileChunk) *ChunkStreamReader {
+
+ chunkViews := ViewFromChunks(chunks, 0, math.MaxInt32)
+
+ return &ChunkStreamReader{
+ chunkViews: chunkViews,
+ lookupFileId: LookupFn(filerClient),
+ }
+}
+
func (c *ChunkStreamReader) Read(p []byte) (n int, err error) {
for n < len(p) {
if c.isBufferEmpty() {
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index b91aa3a72..c3dab919c 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -263,7 +263,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
glog.V(3).Infof("dir ReadDirAll %s", dir.FullPath())
cacheTtl := 5 * time.Minute
- processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) {
+ processEachEntryFn := func(entry *filer_pb.Entry, isLast bool) error {
fullpath := util.NewFullPath(dir.FullPath(), entry.Name)
inode := fullpath.AsInode()
if entry.IsDirectory {
@@ -274,6 +274,7 @@ func (dir *Dir) ReadDirAll(ctx context.Context) (ret []fuse.Dirent, err error) {
ret = append(ret, dirent)
}
dir.wfs.cacheSet(fullpath, entry, cacheTtl)
+ return nil
}
if dir.wfs.option.AsyncMetaDataCaching {
diff --git a/weed/messaging/broker/broker_grpc_server_subscribe.go b/weed/messaging/broker/broker_grpc_server_subscribe.go
index 290c84e34..c5e033420 100644
--- a/weed/messaging/broker/broker_grpc_server_subscribe.go
+++ b/weed/messaging/broker/broker_grpc_server_subscribe.go
@@ -3,10 +3,12 @@ package broker
import (
"fmt"
"io"
+ "strings"
"time"
"github.com/golang/protobuf/proto"
+ "github.com/chrislusf/seaweedfs/weed/filer2"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"github.com/chrislusf/seaweedfs/weed/pb/messaging_pb"
@@ -57,6 +59,7 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
case messaging_pb.SubscriberMessage_InitMessage_LATEST:
case messaging_pb.SubscriberMessage_InitMessage_EARLIEST:
}
+ var processedTsNs int64
// how to process each message
// an error returned will end the subscription
@@ -81,9 +84,18 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
glog.Errorf("sending %d bytes to %s: %s", len(m.Value), subscriberId, err)
return err
}
+ processedTsNs = logEntry.TsNs
return nil
}
+ if err := broker.readPersistedLogBuffer(&tp, lastReadTime, eachLogEntryFn); err != nil {
+ return err
+ }
+
+ if processedTsNs != 0 {
+ lastReadTime = time.Unix(0, processedTsNs)
+ }
+
messageCount, err = lock.logBuffer.LoopProcessLogData(lastReadTime, func() bool {
lock.Mutex.Lock()
lock.cond.Wait()
@@ -94,3 +106,36 @@ func (broker *MessageBroker) Subscribe(stream messaging_pb.SeaweedMessaging_Subs
return err
}
+
+func (broker *MessageBroker) readPersistedLogBuffer(tp *TopicPartition, startTime time.Time, eachLogEntryFn func(logEntry *filer_pb.LogEntry) error) (err error) {
+ startDate := fmt.Sprintf("%04d-%02d-%02d", startTime.Year(), startTime.Month(), startTime.Day())
+ startHourMinute := fmt.Sprintf("%02d-%02d.segment", startTime.Hour(), startTime.Minute())
+
+ sizeBuf := make([]byte, 4)
+ startTsNs := startTime.UnixNano()
+
+ topicDir := fmt.Sprintf("/topics/%s/%s", tp.Namespace, tp.Topic)
+
+ return filer_pb.List(broker, topicDir, "", func(dayEntry *filer_pb.Entry, isLast bool) error {
+ dayDir := fmt.Sprintf("%s/%s", topicDir, dayEntry.Name)
+ return filer_pb.List(broker, dayDir, "", func(hourMinuteEntry *filer_pb.Entry, isLast bool) error {
+ if dayEntry.Name == startDate {
+ if strings.Compare(hourMinuteEntry.Name, startHourMinute) < 0 {
+ return nil
+ }
+ }
+ // println("processing", hourMinuteEntry.FullPath)
+ chunkedFileReader := filer2.NewChunkStreamReader(broker, hourMinuteEntry.Chunks)
+ defer chunkedFileReader.Close()
+ if err := filer2.ReadEachLogEntry(chunkedFileReader, sizeBuf, startTsNs, eachLogEntryFn); err != nil {
+ chunkedFileReader.Close()
+ if err == io.EOF {
+ return nil
+ }
+ return fmt.Errorf("reading %s/%s: %v", dayDir, hourMinuteEntry.Name, err)
+ }
+ return nil
+ }, "", false, 24*60)
+ }, startDate, true, 366)
+
+}
diff --git a/weed/pb/filer_pb/filer_client.go b/weed/pb/filer_pb/filer_client.go
index c5d863262..73b66472d 100644
--- a/weed/pb/filer_pb/filer_client.go
+++ b/weed/pb/filer_pb/filer_client.go
@@ -56,19 +56,21 @@ func GetEntry(filerClient FilerClient, fullFilePath util.FullPath) (entry *Entry
return
}
-func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool)) (err error) {
+type EachEntryFunciton func(entry *Entry, isLast bool) error
+
+func ReadDirAllEntries(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton) (err error) {
return doList(filerClient, fullDirPath, prefix, fn, "", false, math.MaxUint32)
}
-func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) {
+func List(filerClient FilerClient, parentDirectoryPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
return doList(filerClient, util.FullPath(parentDirectoryPath), prefix, fn, startFrom, inclusive, limit)
}
-func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn func(entry *Entry, isLast bool), startFrom string, inclusive bool, limit uint32) (err error) {
+func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, fn EachEntryFunciton, startFrom string, inclusive bool, limit uint32) (err error) {
err = filerClient.WithFilerClient(func(client SeaweedFilerClient) error {
@@ -92,7 +94,9 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f
if recvErr != nil {
if recvErr == io.EOF {
if prevEntry != nil {
- fn(prevEntry, true)
+ if err := fn(prevEntry, true); err != nil {
+ return err
+ }
}
break
} else {
@@ -100,7 +104,9 @@ func doList(filerClient FilerClient, fullDirPath util.FullPath, prefix string, f
}
}
if prevEntry != nil {
- fn(prevEntry, false)
+ if err := fn(prevEntry, false); err != nil {
+ return err
+ }
}
prevEntry = resp.Entry
}
diff --git a/weed/pb/filer_pb/filer_client_bfs.go b/weed/pb/filer_pb/filer_client_bfs.go
index 7c9a8ae28..4e5b65f12 100644
--- a/weed/pb/filer_pb/filer_client_bfs.go
+++ b/weed/pb/filer_pb/filer_client_bfs.go
@@ -45,7 +45,7 @@ func TraverseBfs(filerClient FilerClient, parentPath util.FullPath, fn func(pare
func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queue *util.Queue, jobQueueWg *sync.WaitGroup, fn func(parentPath util.FullPath, entry *Entry)) (err error) {
- return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) {
+ return ReadDirAllEntries(filerClient, parentPath, "", func(entry *Entry, isLast bool) error {
fn(parentPath, entry)
@@ -57,6 +57,7 @@ func processOneDirectory(filerClient FilerClient, parentPath util.FullPath, queu
jobQueueWg.Add(1)
queue.Enqueue(util.FullPath(subDir))
}
+ return nil
})
}
diff --git a/weed/s3api/filer_util.go b/weed/s3api/filer_util.go
index 51249cf39..7f49c320e 100644
--- a/weed/s3api/filer_util.go
+++ b/weed/s3api/filer_util.go
@@ -23,8 +23,9 @@ func (s3a *S3ApiServer) mkFile(parentDirectoryPath string, fileName string, chun
func (s3a *S3ApiServer) list(parentDirectoryPath, prefix, startFrom string, inclusive bool, limit uint32) (entries []*filer_pb.Entry, err error) {
- err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.List(s3a, parentDirectoryPath, prefix, func(entry *filer_pb.Entry, isLast bool) error {
entries = append(entries, entry)
+ return nil
}, startFrom, inclusive, limit)
return
diff --git a/weed/server/webdav_server.go b/weed/server/webdav_server.go
index f195b09f7..a4a1d8b8b 100644
--- a/weed/server/webdav_server.go
+++ b/weed/server/webdav_server.go
@@ -511,7 +511,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
dir, _ := util.FullPath(f.name).DirAndName()
- err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.ReadDirAllEntries(f.fs, util.FullPath(dir), "", func(entry *filer_pb.Entry, isLast bool) error {
fi := FileInfo{
size: int64(filer2.TotalSize(entry.GetChunks())),
name: entry.Name,
@@ -525,6 +525,7 @@ func (f *WebDavFile) Readdir(count int) (ret []os.FileInfo, err error) {
}
glog.V(4).Infof("entry: %v", fi.name)
ret = append(ret, &fi)
+ return nil
})
old := f.off
diff --git a/weed/shell/command_bucket_list.go b/weed/shell/command_bucket_list.go
index b982ff646..2e446b6b2 100644
--- a/weed/shell/command_bucket_list.go
+++ b/weed/shell/command_bucket_list.go
@@ -45,12 +45,13 @@ func (c *commandBucketList) Do(args []string, commandEnv *CommandEnv, writer io.
return fmt.Errorf("read buckets: %v", err)
}
- err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.List(commandEnv, filerBucketsPath, "", func(entry *filer_pb.Entry, isLast bool) error {
if entry.Attributes.Replication == "" || entry.Attributes.Replication == "000" {
fmt.Fprintf(writer, " %s\n", entry.Name)
} else {
fmt.Fprintf(writer, " %s\t\t\treplication: %s\n", entry.Name, entry.Attributes.Replication)
}
+ return nil
}, "", false, math.MaxUint32)
if err != nil {
return fmt.Errorf("list buckets under %v: %v", filerBucketsPath, err)
diff --git a/weed/shell/command_fs_du.go b/weed/shell/command_fs_du.go
index 08c553e7c..96551dd5a 100644
--- a/weed/shell/command_fs_du.go
+++ b/weed/shell/command_fs_du.go
@@ -54,7 +54,7 @@ func (c *commandFsDu) Do(args []string, commandEnv *CommandEnv, writer io.Writer
func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir, name string) (blockCount, byteCount uint64, err error) {
- err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.ReadDirAllEntries(filerClient, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error {
var fileBlockCount, fileByteCount uint64
@@ -78,6 +78,7 @@ func duTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, dir
if name != "" && !entry.IsDirectory {
fmt.Fprintf(writer, "block:%4d\tbyte:%10d\t%s/%s\n", fileBlockCount, fileByteCount, dir, entry.Name)
}
+ return nil
})
return
}
diff --git a/weed/shell/command_fs_ls.go b/weed/shell/command_fs_ls.go
index 569b3bb9b..36133992f 100644
--- a/weed/shell/command_fs_ls.go
+++ b/weed/shell/command_fs_ls.go
@@ -62,10 +62,10 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer
dir, name := util.FullPath(path).DirAndName()
entryCount := 0
- err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.ReadDirAllEntries(commandEnv, util.FullPath(dir), name, func(entry *filer_pb.Entry, isLast bool) error {
if !showHidden && strings.HasPrefix(entry.Name, ".") {
- return
+ return nil
}
entryCount++
@@ -100,6 +100,7 @@ func (c *commandFsLs) Do(args []string, commandEnv *CommandEnv, writer io.Writer
fmt.Fprintf(writer, "%s\n", entry.Name)
}
+ return nil
})
if isLongFormat && err == nil {
diff --git a/weed/shell/command_fs_tree.go b/weed/shell/command_fs_tree.go
index b0752ea03..a8c5b2018 100644
--- a/weed/shell/command_fs_tree.go
+++ b/weed/shell/command_fs_tree.go
@@ -51,10 +51,10 @@ func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, d
prefix.addMarker(level)
- err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) {
+ err = filer_pb.ReadDirAllEntries(filerClient, dir, name, func(entry *filer_pb.Entry, isLast bool) error {
if level < 0 && name != "" {
if entry.Name != name {
- return
+ return nil
}
}
@@ -70,7 +70,7 @@ func treeTraverseDirectory(writer io.Writer, filerClient filer_pb.FilerClient, d
} else {
fileCount++
}
-
+ return nil
})
return
}