diff options
Diffstat (limited to 'weed/sftpd/sftp_filer.go')
| -rw-r--r-- | weed/sftpd/sftp_filer.go | 457 |
1 files changed, 457 insertions, 0 deletions
diff --git a/weed/sftpd/sftp_filer.go b/weed/sftpd/sftp_filer.go new file mode 100644 index 000000000..dbe6a438d --- /dev/null +++ b/weed/sftpd/sftp_filer.go @@ -0,0 +1,457 @@ +// sftp_filer_refactored.go +package sftpd + +import ( + "bytes" + "context" + "crypto/md5" + "encoding/json" + "fmt" + "io" + "net/http" + "os" + "path" + "strings" + "syscall" + "time" + + "github.com/pkg/sftp" + "github.com/seaweedfs/seaweedfs/weed/filer" + "github.com/seaweedfs/seaweedfs/weed/glog" + "github.com/seaweedfs/seaweedfs/weed/pb" + filer_pb "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb" + weed_server "github.com/seaweedfs/seaweedfs/weed/server" + "github.com/seaweedfs/seaweedfs/weed/sftpd/user" + "github.com/seaweedfs/seaweedfs/weed/util" + "google.golang.org/grpc" +) + +const ( + defaultTimeout = 30 * time.Second + defaultListLimit = 1000 +) + +// ==================== Filer RPC Helpers ==================== + +// callWithClient wraps a gRPC client call with timeout and client creation. +func (fs *SftpServer) callWithClient(streaming bool, fn func(ctx context.Context, client filer_pb.SeaweedFilerClient) error) error { + return fs.withTimeoutContext(func(ctx context.Context) error { + return fs.WithFilerClient(streaming, func(client filer_pb.SeaweedFilerClient) error { + return fn(ctx, client) + }) + }) +} + +// getEntry retrieves a single directory entry by path. +func (fs *SftpServer) getEntry(p string) (*filer_pb.Entry, error) { + dir, name := util.FullPath(p).DirAndName() + var entry *filer_pb.Entry + err := fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + r, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{Directory: dir, Name: name}) + if err != nil { + return err + } + if r.Entry == nil { + return fmt.Errorf("%s not found in %s", name, dir) + } + entry = r.Entry + return nil + }) + if err != nil { + return nil, fmt.Errorf("lookup %s: %w", p, err) + } + return entry, nil +} + +// updateEntry sends an UpdateEntryRequest for the given entry. +func (fs *SftpServer) updateEntry(dir string, entry *filer_pb.Entry) error { + return fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + _, err := client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{Directory: dir, Entry: entry}) + return err + }) +} + +// ==================== FilerClient Interface ==================== + +func (fs *SftpServer) AdjustedUrl(location *filer_pb.Location) string { return location.Url } +func (fs *SftpServer) GetDataCenter() string { return fs.dataCenter } +func (fs *SftpServer) WithFilerClient(streamingMode bool, fn func(filer_pb.SeaweedFilerClient) error) error { + addr := fs.filerAddr.ToGrpcAddress() + return pb.WithGrpcClient(streamingMode, util.RandomInt32(), func(conn *grpc.ClientConn) error { + return fn(filer_pb.NewSeaweedFilerClient(conn)) + }, addr, false, fs.grpcDialOption) +} +func (fs *SftpServer) withTimeoutContext(fn func(ctx context.Context) error) error { + ctx, cancel := context.WithTimeout(context.Background(), defaultTimeout) + defer cancel() + return fn(ctx) +} + +// ==================== Command Dispatcher ==================== + +func (fs *SftpServer) dispatchCmd(r *sftp.Request) error { + glog.V(0).Infof("Dispatch: %s %s", r.Method, r.Filepath) + switch r.Method { + case "Remove": + return fs.removeEntry(r) + case "Rename": + return fs.renameEntry(r) + case "Mkdir": + return fs.makeDir(r) + case "Rmdir": + return fs.removeDir(r) + case "Setstat": + return fs.setFileStat(r) + default: + return fmt.Errorf("unsupported: %s", r.Method) + } +} + +// ==================== File Operations ==================== + +func (fs *SftpServer) readFile(r *sftp.Request) (io.ReaderAt, error) { + if err := fs.checkFilePermission(r.Filepath, "read"); err != nil { + return nil, err + } + entry, err := fs.getEntry(r.Filepath) + if err != nil { + return nil, err + } + return &SeaweedFileReaderAt{fs: fs, entry: entry}, nil +} + +// putFile uploads a file to the filer and sets ownership metadata. +func (fs *SftpServer) putFile(filepath string, data []byte, user *user.User) error { + dir, filename := util.FullPath(filepath).DirAndName() + uploadUrl := fmt.Sprintf("http://%s%s", fs.filerAddr, filepath) + + // Create a reader from our buffered data and calculate MD5 hash + hash := md5.New() + reader := bytes.NewReader(data) + body := io.TeeReader(reader, hash) + fileSize := int64(len(data)) + + // Create and execute HTTP request + proxyReq, err := http.NewRequest(http.MethodPut, uploadUrl, body) + if err != nil { + return fmt.Errorf("create request: %v", err) + } + proxyReq.ContentLength = fileSize + proxyReq.Header.Set("Content-Type", "application/octet-stream") + + client := &http.Client{} + resp, err := client.Do(proxyReq) + if err != nil { + return fmt.Errorf("upload to filer: %v", err) + } + defer resp.Body.Close() + + // Process response + respBody, err := io.ReadAll(resp.Body) + if err != nil { + return fmt.Errorf("read response: %v", err) + } + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + return fmt.Errorf("upload failed with status %d: %s", resp.StatusCode, string(respBody)) + } + + var result weed_server.FilerPostResult + if err := json.Unmarshal(respBody, &result); err != nil { + return fmt.Errorf("parse response: %v", err) + } + + if result.Error != "" { + return fmt.Errorf("filer error: %s", result.Error) + } + + // Update file ownership using the same pattern as other functions + if user != nil { + err := fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + // Look up the file to get its current entry + lookupResp, err := client.LookupDirectoryEntry(ctx, &filer_pb.LookupDirectoryEntryRequest{ + Directory: dir, + Name: filename, + }) + if err != nil { + return fmt.Errorf("lookup file for attribute update: %v", err) + } + + if lookupResp.Entry == nil { + return fmt.Errorf("file not found after upload: %s/%s", dir, filename) + } + + // Update the entry with new uid/gid + entry := lookupResp.Entry + entry.Attributes.Uid = user.Uid + entry.Attributes.Gid = user.Gid + + // Update the entry in the filer + _, err = client.UpdateEntry(ctx, &filer_pb.UpdateEntryRequest{ + Directory: dir, + Entry: entry, + }) + return err + }) + + if err != nil { + // Log the error but don't fail the whole operation + glog.Errorf("Failed to update file ownership for %s: %v", filepath, err) + } + } + + return nil +} + +func (fs *SftpServer) newFileWriter(r *sftp.Request) (io.WriterAt, error) { + return &filerFileWriter{fs: *fs, req: r, permissions: 0644, uid: fs.user.Uid, gid: fs.user.Gid}, nil +} + +func (fs *SftpServer) removeEntry(r *sftp.Request) error { + return fs.deleteEntry(r.Filepath, false) +} + +func (fs *SftpServer) renameEntry(r *sftp.Request) error { + if err := fs.checkFilePermission(r.Filepath, "rename"); err != nil { + return err + } + oldDir, oldName := util.FullPath(r.Filepath).DirAndName() + newDir, newName := util.FullPath(r.Target).DirAndName() + return fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + _, err := client.AtomicRenameEntry(ctx, &filer_pb.AtomicRenameEntryRequest{ + OldDirectory: oldDir, OldName: oldName, + NewDirectory: newDir, NewName: newName, + }) + return err + }) +} + +func (fs *SftpServer) setFileStat(r *sftp.Request) error { + if err := fs.checkFilePermission(r.Filepath, "write"); err != nil { + return err + } + entry, err := fs.getEntry(r.Filepath) + if err != nil { + return err + } + dir, _ := util.FullPath(r.Filepath).DirAndName() + // apply attrs + if r.AttrFlags().Permissions { + entry.Attributes.FileMode = uint32(r.Attributes().FileMode()) + } + if r.AttrFlags().UidGid { + entry.Attributes.Uid = uint32(r.Attributes().UID) + entry.Attributes.Gid = uint32(r.Attributes().GID) + } + if r.AttrFlags().Acmodtime { + entry.Attributes.Mtime = int64(r.Attributes().Mtime) + } + if r.AttrFlags().Size { + entry.Attributes.FileSize = uint64(r.Attributes().Size) + } + return fs.updateEntry(dir, entry) +} + +// ==================== Directory Operations ==================== + +func (fs *SftpServer) listDir(r *sftp.Request) (sftp.ListerAt, error) { + if err := fs.checkFilePermission(r.Filepath, "list"); err != nil { + return nil, err + } + if r.Method == "Stat" || r.Method == "Lstat" { + entry, err := fs.getEntry(r.Filepath) + if err != nil { + return nil, err + } + fi := &EnhancedFileInfo{FileInfo: FileInfoFromEntry(entry), uid: entry.Attributes.Uid, gid: entry.Attributes.Gid} + return listerat([]os.FileInfo{fi}), nil + } + return fs.listAllPages(r.Filepath) +} + +func (fs *SftpServer) listAllPages(dirPath string) (sftp.ListerAt, error) { + var all []os.FileInfo + last := "" + for { + page, err := fs.fetchDirectoryPage(dirPath, last) + if err != nil { + return nil, err + } + all = append(all, page...) + if len(page) < defaultListLimit { + break + } + last = page[len(page)-1].Name() + } + return listerat(all), nil +} + +func (fs *SftpServer) fetchDirectoryPage(dirPath, start string) ([]os.FileInfo, error) { + var list []os.FileInfo + err := fs.callWithClient(true, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + stream, err := client.ListEntries(ctx, &filer_pb.ListEntriesRequest{Directory: dirPath, StartFromFileName: start, Limit: defaultListLimit}) + if err != nil { + return err + } + for { + r, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil || r.Entry == nil { + continue + } + p := path.Join(dirPath, r.Entry.Name) + if err := fs.checkFilePermission(p, "list"); err != nil { + continue + } + list = append(list, &EnhancedFileInfo{FileInfo: FileInfoFromEntry(r.Entry), uid: r.Entry.Attributes.Uid, gid: r.Entry.Attributes.Gid}) + } + return nil + }) + return list, err +} + +// makeDir creates a new directory with proper permissions. +func (fs *SftpServer) makeDir(r *sftp.Request) error { + if fs.user == nil { + return fmt.Errorf("cannot create directory: no user info") + } + dir, name := util.FullPath(r.Filepath).DirAndName() + if err := fs.checkFilePermission(dir, "mkdir"); err != nil { + return err + } + // default mode and ownership + err := filer_pb.Mkdir(fs, string(dir), name, func(entry *filer_pb.Entry) { + mode := uint32(0755 | os.ModeDir) + if strings.HasPrefix(r.Filepath, fs.user.HomeDir) { + mode = uint32(0700 | os.ModeDir) + } + entry.Attributes.FileMode = mode + entry.Attributes.Uid = fs.user.Uid + entry.Attributes.Gid = fs.user.Gid + now := time.Now().Unix() + entry.Attributes.Crtime = now + entry.Attributes.Mtime = now + if entry.Extended == nil { + entry.Extended = make(map[string][]byte) + } + entry.Extended["creator"] = []byte(fs.user.Username) + }) + return err +} + +// removeDir deletes a directory. +func (fs *SftpServer) removeDir(r *sftp.Request) error { + return fs.deleteEntry(r.Filepath, false) +} + +// ==================== Common Arguments Helpers ==================== + +func FileInfoFromEntry(e *filer_pb.Entry) FileInfo { + return FileInfo{name: e.Name, size: int64(e.Attributes.FileSize), mode: os.FileMode(e.Attributes.FileMode), modTime: time.Unix(e.Attributes.Mtime, 0), isDir: e.IsDirectory} +} + +func (fs *SftpServer) deleteEntry(p string, recursive bool) error { + if err := fs.checkFilePermission(p, "delete"); err != nil { + return err + } + dir, name := util.FullPath(p).DirAndName() + return fs.callWithClient(false, func(ctx context.Context, client filer_pb.SeaweedFilerClient) error { + r, err := client.DeleteEntry(ctx, &filer_pb.DeleteEntryRequest{Directory: dir, Name: name, IsDeleteData: true, IsRecursive: recursive}) + if err != nil { + return err + } + if r.Error != "" { + return fmt.Errorf("%s", r.Error) + } + return nil + }) +} + +// ==================== Custom Types ==================== + +type EnhancedFileInfo struct { + FileInfo + uid uint32 + gid uint32 +} + +func (fi *EnhancedFileInfo) Sys() interface{} { + return &syscall.Stat_t{Uid: fi.uid, Gid: fi.gid} +} + +func (fi *EnhancedFileInfo) Owner() (uid, gid int) { + return int(fi.uid), int(fi.gid) +} + +// SeaweedFileReaderAt implements io.ReaderAt for SeaweedFS files + +type SeaweedFileReaderAt struct { + fs *SftpServer + entry *filer_pb.Entry +} + +func (ra *SeaweedFileReaderAt) ReadAt(p []byte, off int64) (n int, err error) { + // Create a new reader for each ReadAt call + reader := filer.NewFileReader(ra.fs, ra.entry) + if reader == nil { + return 0, fmt.Errorf("failed to create file reader") + } + + // Check if we're reading past the end of the file + fileSize := int64(ra.entry.Attributes.FileSize) + if off >= fileSize { + return 0, io.EOF + } + + // Seek to the offset + if seeker, ok := reader.(io.Seeker); ok { + _, err = seeker.Seek(off, io.SeekStart) + if err != nil { + return 0, fmt.Errorf("seek error: %v", err) + } + } else { + // If the reader doesn't implement Seek, we need to read and discard bytes + toSkip := off + skipBuf := make([]byte, 8192) + for toSkip > 0 { + skipSize := int64(len(skipBuf)) + if skipSize > toSkip { + skipSize = toSkip + } + read, err := reader.Read(skipBuf[:skipSize]) + if err != nil { + return 0, fmt.Errorf("skip error: %v", err) + } + if read == 0 { + return 0, fmt.Errorf("unable to skip to offset %d", off) + } + toSkip -= int64(read) + } + } + + // Adjust read length if it would go past EOF + readLen := len(p) + remaining := fileSize - off + if int64(readLen) > remaining { + readLen = int(remaining) + if readLen == 0 { + return 0, io.EOF + } + } + + // Read the data + n, err = io.ReadFull(reader, p[:readLen]) + + // Handle EOF correctly + if err == io.ErrUnexpectedEOF || (err == nil && n < len(p)) { + err = io.EOF + } + + return n, err +} + +func (fs *SftpServer) checkFilePermission(filepath string, permissions string) error { + return fs.authManager.CheckPermission(fs.user, filepath, permissions) +} |
