aboutsummaryrefslogtreecommitdiff
path: root/weed/filesys
diff options
context:
space:
mode:
Diffstat (limited to 'weed/filesys')
-rw-r--r--weed/filesys/dirty_page.go8
-rw-r--r--weed/filesys/filehandle.go5
-rw-r--r--weed/filesys/wfs.go8
-rw-r--r--weed/filesys/wfs_deletion.go38
4 files changed, 11 insertions, 48 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go
index 696296e62..69f652ead 100644
--- a/weed/filesys/dirty_page.go
+++ b/weed/filesys/dirty_page.go
@@ -4,13 +4,14 @@ import (
"bytes"
"context"
"fmt"
+ "sync"
"sync/atomic"
"time"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/operation"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
- "sync"
+ "github.com/chrislusf/seaweedfs/weed/security"
)
type ContinuousDirtyPages struct {
@@ -164,6 +165,7 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Contex
func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) {
var fileId, host string
+ var auth security.EncodedJwt
if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
@@ -181,7 +183,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
return err
}
- fileId, host = resp.FileId, resp.Url
+ fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth)
return nil
}); err != nil {
@@ -190,7 +192,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte
fileUrl := fmt.Sprintf("http://%s/%s", host, fileId)
bufReader := bytes.NewReader(buf)
- uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "")
+ uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, auth)
if err != nil {
glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err)
return nil, fmt.Errorf("upload data: %v", err)
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 0f6ca1164..3bca0e22e 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -10,6 +10,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
"github.com/seaweedfs/fuse"
"github.com/seaweedfs/fuse/fs"
+ "google.golang.org/grpc"
"net/http"
"strings"
"sync"
@@ -230,7 +231,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error {
})
}
-func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, fileIds []string) error {
+func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error {
var vids []string
for _, fileId := range fileIds {
@@ -267,7 +268,7 @@ func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, file
return m, err
}
- _, err := operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
return err
}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 969514a06..f7383582d 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -19,6 +19,7 @@ import (
type Option struct {
FilerGrpcAddress string
+ GrpcDialOption grpc.DialOption
FilerMountRootPath string
Collection string
Replication string
@@ -46,8 +47,6 @@ type WFS struct {
pathToHandleLock sync.Mutex
bufPool sync.Pool
- fileIdsDeletionChan chan []string
-
stats statsCache
}
type statsCache struct {
@@ -65,11 +64,8 @@ func NewSeaweedFileSystem(option *Option) *WFS {
return make([]byte, option.ChunkSizeLimit)
},
},
- fileIdsDeletionChan: make(chan []string, 32),
}
- go wfs.loopProcessingDeletion()
-
return wfs
}
@@ -82,7 +78,7 @@ func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := filer_pb.NewSeaweedFilerClient(grpcConnection)
return fn(client)
- }, wfs.option.FilerGrpcAddress)
+ }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption)
}
diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go
index f58ef24f4..90058d75a 100644
--- a/weed/filesys/wfs_deletion.go
+++ b/weed/filesys/wfs_deletion.go
@@ -2,39 +2,9 @@ package filesys
import (
"context"
- "time"
-
- "github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
)
-func (wfs *WFS) loopProcessingDeletion() {
-
- ticker := time.NewTicker(2 * time.Second)
-
- wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- var fileIds []string
- for {
- select {
- case fids := <-wfs.fileIdsDeletionChan:
- fileIds = append(fileIds, fids...)
- if len(fileIds) >= 1024 {
- glog.V(1).Infof("deleting fileIds len=%d", len(fileIds))
- deleteFileIds(context.Background(), client, fileIds)
- fileIds = fileIds[:0]
- }
- case <-ticker.C:
- if len(fileIds) > 0 {
- glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds))
- deleteFileIds(context.Background(), client, fileIds)
- fileIds = fileIds[:0]
- }
- }
- }
- })
-
-}
-
func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
if len(chunks) == 0 {
return
@@ -45,14 +15,8 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) {
fileIds = append(fileIds, chunk.FileId)
}
- var async = false
- if async {
- wfs.fileIdsDeletionChan <- fileIds
- return
- }
-
wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error {
- deleteFileIds(context.Background(), client, fileIds)
+ deleteFileIds(context.Background(), wfs.option.GrpcDialOption, client, fileIds)
return nil
})
}