aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorChris Lu <chris.lu@gmail.com>2019-10-22 00:50:30 -0700
committerChris Lu <chris.lu@gmail.com>2019-10-22 00:50:30 -0700
commitfc412e428bd6816f1d53ed9017d1ac7ee02db817 (patch)
treea0ee11f646b6072ab83fb9bf8f96f56b8b1a343a
parentc9a183eb69205e7f821615ced2bc1f11f47d6e40 (diff)
downloadseaweedfs-fc412e428bd6816f1d53ed9017d1ac7ee02db817.tar.xz
seaweedfs-fc412e428bd6816f1d53ed9017d1ac7ee02db817.zip
refactor ScanVolumeFileFrom()
-rw-r--r--unmaintained/remove_duplicate_fids/remove_duplicate_fids.go2
-rw-r--r--unmaintained/see_dat/see_dat.go2
-rw-r--r--weed/command/export.go2
-rw-r--r--weed/command/fix.go2
-rw-r--r--weed/server/volume_grpc_tail.go70
-rw-r--r--weed/storage/volume_backup.go2
-rw-r--r--weed/storage/volume_read_write.go42
-rw-r--r--weed/storage/volume_vacuum.go2
8 files changed, 55 insertions, 69 deletions
diff --git a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
index 4b37a64fb..bbb6f6d9a 100644
--- a/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
+++ b/unmaintained/remove_duplicate_fids/remove_duplicate_fids.go
@@ -40,7 +40,7 @@ func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool {
return true
}
-func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error {
+func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
if scanner.dat == nil {
newDatFile, err := os.Create(filepath.Join(*volumePath, "dat_fixed"))
diff --git a/unmaintained/see_dat/see_dat.go b/unmaintained/see_dat/see_dat.go
index e07704fc6..84a06c625 100644
--- a/unmaintained/see_dat/see_dat.go
+++ b/unmaintained/see_dat/see_dat.go
@@ -28,7 +28,7 @@ func (scanner *VolumeFileScanner4SeeDat) ReadNeedleBody() bool {
return true
}
-func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64) error {
+func (scanner *VolumeFileScanner4SeeDat) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
t := time.Unix(int64(n.AppendAtNs)/int64(time.Second), int64(n.AppendAtNs)%int64(time.Second))
glog.V(0).Infof("%d,%s%x offset %d size %d cookie %x appendedAt %v", *volumeId, n.Id, n.Cookie, offset, n.Size, n.Cookie, t)
return nil
diff --git a/weed/command/export.go b/weed/command/export.go
index 7e94ec11c..d3a765e09 100644
--- a/weed/command/export.go
+++ b/weed/command/export.go
@@ -102,7 +102,7 @@ func (scanner *VolumeFileScanner4Export) ReadNeedleBody() bool {
return true
}
-func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64) error {
+func (scanner *VolumeFileScanner4Export) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
needleMap := scanner.needleMap
vid := scanner.vid
diff --git a/weed/command/fix.go b/weed/command/fix.go
index bf33490cc..2fbbca5e6 100644
--- a/weed/command/fix.go
+++ b/weed/command/fix.go
@@ -43,7 +43,7 @@ func (scanner *VolumeFileScanner4Fix) ReadNeedleBody() bool {
return false
}
-func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64) error {
+func (scanner *VolumeFileScanner4Fix) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
glog.V(2).Infof("key %d offset %d size %d disk_size %d gzip %v", n.Id, offset, n.Size, n.DiskSize(scanner.version), n.IsGzipped())
if n.Size > 0 && n.Size != types.TombstoneFileSize {
pe := scanner.nm.Put(n.Id, types.ToOffset(offset), n.Size)
diff --git a/weed/server/volume_grpc_tail.go b/weed/server/volume_grpc_tail.go
index 34c55a599..1e0c91274 100644
--- a/weed/server/volume_grpc_tail.go
+++ b/weed/server/volume_grpc_tail.go
@@ -67,34 +67,13 @@ func sendNeedlesSince(stream volume_server_pb.VolumeServer_VolumeTailSenderServe
return lastTimestampNs, sendErr
}
- err = storage.ScanVolumeFileNeedleFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error {
-
- isLastChunk := false
-
- // need to send body by chunks
- for i := 0; i < len(needleBody); i += BufferSizeLimit {
- stopOffset := i + BufferSizeLimit
- if stopOffset >= len(needleBody) {
- isLastChunk = true
- stopOffset = len(needleBody)
- }
-
- sendErr := stream.Send(&volume_server_pb.VolumeTailSenderResponse{
- NeedleHeader: needleHeader,
- NeedleBody: needleBody[i:stopOffset],
- IsLastChunk: isLastChunk,
- })
- if sendErr != nil {
- return sendErr
- }
- }
-
- lastProcessedTimestampNs = needleAppendAtNs
- return nil
+ scanner := &VolumeFileScanner4Tailing{
+ stream:stream,
+ }
- })
+ err = storage.ScanVolumeFileFrom(v.Version(), v.DataFile(), foundOffset.ToAcutalOffset(), scanner)
- return
+ return scanner.lastProcessedTimestampNs, err
}
@@ -115,3 +94,42 @@ func (vs *VolumeServer) VolumeTailReceiver(ctx context.Context, req *volume_serv
})
}
+
+// generate the volume idx
+type VolumeFileScanner4Tailing struct {
+ stream volume_server_pb.VolumeServer_VolumeTailSenderServer
+ lastProcessedTimestampNs uint64
+}
+
+func (scanner *VolumeFileScanner4Tailing) VisitSuperBlock(superBlock storage.SuperBlock) error {
+ return nil
+
+}
+func (scanner *VolumeFileScanner4Tailing) ReadNeedleBody() bool {
+ return true
+}
+
+func (scanner *VolumeFileScanner4Tailing) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
+ isLastChunk := false
+
+ // need to send body by chunks
+ for i := 0; i < len(needleBody); i += BufferSizeLimit {
+ stopOffset := i + BufferSizeLimit
+ if stopOffset >= len(needleBody) {
+ isLastChunk = true
+ stopOffset = len(needleBody)
+ }
+
+ sendErr := scanner.stream.Send(&volume_server_pb.VolumeTailSenderResponse{
+ NeedleHeader: needleHeader,
+ NeedleBody: needleBody[i:stopOffset],
+ IsLastChunk: isLastChunk,
+ })
+ if sendErr != nil {
+ return sendErr
+ }
+ }
+
+ scanner.lastProcessedTimestampNs = n.AppendAtNs
+ return nil
+}
diff --git a/weed/storage/volume_backup.go b/weed/storage/volume_backup.go
index 86d13da7a..f48ccbb68 100644
--- a/weed/storage/volume_backup.go
+++ b/weed/storage/volume_backup.go
@@ -251,7 +251,7 @@ func (scanner *VolumeFileScanner4GenIdx) ReadNeedleBody() bool {
return false
}
-func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64) error {
+func (scanner *VolumeFileScanner4GenIdx) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
if n.Size > 0 && n.Size != TombstoneFileSize {
return scanner.v.nm.Put(n.Id, ToOffset(offset), n.Size)
}
diff --git a/weed/storage/volume_read_write.go b/weed/storage/volume_read_write.go
index 767a318c8..7a216b77e 100644
--- a/weed/storage/volume_read_write.go
+++ b/weed/storage/volume_read_write.go
@@ -179,7 +179,7 @@ func (v *Volume) readNeedle(n *needle.Needle) (int, error) {
type VolumeFileScanner interface {
VisitSuperBlock(SuperBlock) error
ReadNeedleBody() bool
- VisitNeedle(n *needle.Needle, offset int64) error
+ VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error
}
func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
@@ -202,7 +202,7 @@ func ScanVolumeFile(dirname string, collection string, id needle.VolumeId,
}
func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64, volumeFileScanner VolumeFileScanner) (err error) {
- n, _, rest, e := needle.ReadNeedleHeader(dataFile, version, offset)
+ n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset)
if e != nil {
if e == io.EOF {
return nil
@@ -210,14 +210,15 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64,
return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e)
}
for n != nil {
+ var needleBody []byte
if volumeFileScanner.ReadNeedleBody() {
- if _, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil {
+ if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil {
glog.V(0).Infof("cannot read needle body: %v", err)
//err = fmt.Errorf("cannot read needle body: %v", err)
//return
}
}
- err := volumeFileScanner.VisitNeedle(n, offset)
+ err := volumeFileScanner.VisitNeedle(n, offset, nh, needleBody)
if err == io.EOF {
return nil
}
@@ -237,36 +238,3 @@ func ScanVolumeFileFrom(version needle.Version, dataFile *os.File, offset int64,
}
return nil
}
-
-func ScanVolumeFileNeedleFrom(version needle.Version, dataFile *os.File, offset int64, fn func(needleHeader, needleBody []byte, needleAppendAtNs uint64) error) (err error) {
- n, nh, rest, e := needle.ReadNeedleHeader(dataFile, version, offset)
- if e != nil {
- if e == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read %s at offset %d: %v", dataFile.Name(), offset, e)
- }
- for n != nil {
- var needleBody []byte
- if needleBody, err = n.ReadNeedleBody(dataFile, version, offset+NeedleHeaderSize, rest); err != nil {
- glog.V(0).Infof("cannot read needle body: %v", err)
- //err = fmt.Errorf("cannot read needle body: %v", err)
- //return
- }
- err = fn(nh, needleBody, n.AppendAtNs)
- if err != nil {
- glog.V(0).Infof("visit needle error: %v", err)
- return
- }
- offset += NeedleHeaderSize + rest
- glog.V(4).Infof("==> new entry offset %d", offset)
- if n, nh, rest, err = needle.ReadNeedleHeader(dataFile, version, offset); err != nil {
- if err == io.EOF {
- return nil
- }
- return fmt.Errorf("cannot read needle header at offset %d: %v", offset, err)
- }
- glog.V(4).Infof("new entry needle size:%d rest:%d", n.Size, rest)
- }
- return nil
-}
diff --git a/weed/storage/volume_vacuum.go b/weed/storage/volume_vacuum.go
index 522d227c0..73314f022 100644
--- a/weed/storage/volume_vacuum.go
+++ b/weed/storage/volume_vacuum.go
@@ -286,7 +286,7 @@ func (scanner *VolumeFileScanner4Vacuum) ReadNeedleBody() bool {
return true
}
-func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64) error {
+func (scanner *VolumeFileScanner4Vacuum) VisitNeedle(n *needle.Needle, offset int64, needleHeader, needleBody []byte) error {
if n.HasTtl() && scanner.now >= n.LastModified+uint64(scanner.v.Ttl.Minutes()*60) {
return nil
}