diff options
Diffstat (limited to 'weed/filesys')
| -rw-r--r-- | weed/filesys/dirty_page.go | 8 | ||||
| -rw-r--r-- | weed/filesys/filehandle.go | 5 | ||||
| -rw-r--r-- | weed/filesys/wfs.go | 8 | ||||
| -rw-r--r-- | weed/filesys/wfs_deletion.go | 38 |
4 files changed, 11 insertions, 48 deletions
diff --git a/weed/filesys/dirty_page.go b/weed/filesys/dirty_page.go index 696296e62..69f652ead 100644 --- a/weed/filesys/dirty_page.go +++ b/weed/filesys/dirty_page.go @@ -4,13 +4,14 @@ import ( "bytes" "context" "fmt" + "sync" "sync/atomic" "time" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/operation" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" - "sync" + "github.com/chrislusf/seaweedfs/weed/security" ) type ContinuousDirtyPages struct { @@ -164,6 +165,7 @@ func (pages *ContinuousDirtyPages) saveExistingPagesToStorage(ctx context.Contex func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte, offset int64) (*filer_pb.FileChunk, error) { var fileId, host string + var auth security.EncodedJwt if err := pages.f.wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { @@ -181,7 +183,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte return err } - fileId, host = resp.FileId, resp.Url + fileId, host, auth = resp.FileId, resp.Url, security.EncodedJwt(resp.Auth) return nil }); err != nil { @@ -190,7 +192,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(ctx context.Context, buf []byte fileUrl := fmt.Sprintf("http://%s/%s", host, fileId) bufReader := bytes.NewReader(buf) - uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, "") + uploadResult, err := operation.Upload(fileUrl, pages.f.Name, bufReader, false, "application/octet-stream", nil, auth) if err != nil { glog.V(0).Infof("upload data %v to %s: %v", pages.f.Name, fileUrl, err) return nil, fmt.Errorf("upload data: %v", err) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 0f6ca1164..3bca0e22e 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -10,6 +10,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" "github.com/seaweedfs/fuse" "github.com/seaweedfs/fuse/fs" + "google.golang.org/grpc" "net/http" "strings" "sync" @@ -230,7 +231,7 @@ func (fh *FileHandle) Flush(ctx context.Context, req *fuse.FlushRequest) error { }) } -func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, fileIds []string) error { +func deleteFileIds(ctx context.Context, grpcDialOption grpc.DialOption, client filer_pb.SeaweedFilerClient, fileIds []string) error { var vids []string for _, fileId := range fileIds { @@ -267,7 +268,7 @@ func deleteFileIds(ctx context.Context, client filer_pb.SeaweedFilerClient, file return m, err } - _, err := operation.DeleteFilesWithLookupVolumeId(fileIds, lookupFunc) + _, err := operation.DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc) return err } diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 969514a06..f7383582d 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -19,6 +19,7 @@ import ( type Option struct { FilerGrpcAddress string + GrpcDialOption grpc.DialOption FilerMountRootPath string Collection string Replication string @@ -46,8 +47,6 @@ type WFS struct { pathToHandleLock sync.Mutex bufPool sync.Pool - fileIdsDeletionChan chan []string - stats statsCache } type statsCache struct { @@ -65,11 +64,8 @@ func NewSeaweedFileSystem(option *Option) *WFS { return make([]byte, option.ChunkSizeLimit) }, }, - fileIdsDeletionChan: make(chan []string, 32), } - go wfs.loopProcessingDeletion() - return wfs } @@ -82,7 +78,7 @@ func (wfs *WFS) withFilerClient(fn func(filer_pb.SeaweedFilerClient) error) erro return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error { client := filer_pb.NewSeaweedFilerClient(grpcConnection) return fn(client) - }, wfs.option.FilerGrpcAddress) + }, wfs.option.FilerGrpcAddress, wfs.option.GrpcDialOption) } diff --git a/weed/filesys/wfs_deletion.go b/weed/filesys/wfs_deletion.go index f58ef24f4..90058d75a 100644 --- a/weed/filesys/wfs_deletion.go +++ b/weed/filesys/wfs_deletion.go @@ -2,39 +2,9 @@ package filesys import ( "context" - "time" - - "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" ) -func (wfs *WFS) loopProcessingDeletion() { - - ticker := time.NewTicker(2 * time.Second) - - wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - var fileIds []string - for { - select { - case fids := <-wfs.fileIdsDeletionChan: - fileIds = append(fileIds, fids...) - if len(fileIds) >= 1024 { - glog.V(1).Infof("deleting fileIds len=%d", len(fileIds)) - deleteFileIds(context.Background(), client, fileIds) - fileIds = fileIds[:0] - } - case <-ticker.C: - if len(fileIds) > 0 { - glog.V(1).Infof("timed deletion fileIds len=%d", len(fileIds)) - deleteFileIds(context.Background(), client, fileIds) - fileIds = fileIds[:0] - } - } - } - }) - -} - func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { if len(chunks) == 0 { return @@ -45,14 +15,8 @@ func (wfs *WFS) deleteFileChunks(chunks []*filer_pb.FileChunk) { fileIds = append(fileIds, chunk.FileId) } - var async = false - if async { - wfs.fileIdsDeletionChan <- fileIds - return - } - wfs.withFilerClient(func(client filer_pb.SeaweedFilerClient) error { - deleteFileIds(context.Background(), client, fileIds) + deleteFileIds(context.Background(), wfs.option.GrpcDialOption, client, fileIds) return nil }) } |
