diff options
Diffstat (limited to 'weed')
34 files changed, 391 insertions, 178 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go index c5d538bfe..876b1bbf0 100644 --- a/weed/command/filer.go +++ b/weed/command/filer.go @@ -138,6 +138,7 @@ func runFiler(cmd *Command, args []string) bool { startDelay := time.Duration(2) if *filerStartS3 { filerS3Options.filer = &filerAddress + filerS3Options.bindIp = f.bindIp go func() { time.Sleep(startDelay * time.Second) filerS3Options.startS3Server() diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go index af851e2b9..fc11cdbc5 100644 --- a/weed/command/filer_remote_gateway_buckets.go +++ b/weed/command/filer_remote_gateway_buckets.go @@ -181,6 +181,9 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour if message.NewParentPath == option.bucketsDir { return handleCreateBucket(message.NewEntry) } + if strings.HasPrefix(message.NewParentPath, option.bucketsDir) && strings.Contains(message.NewParentPath, "/.uploads/") { + return nil + } if !filer.HasData(message.NewEntry) { return nil } diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go index c55544925..bceeb097e 100644 --- a/weed/command/filer_remote_sync.go +++ b/weed/command/filer_remote_sync.go @@ -40,7 +40,7 @@ func init() { remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster") remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer") remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers") - remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") + remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"") } var cmdFilerRemoteSynchronize = &Command{ @@ -54,6 +54,11 @@ var cmdFilerRemoteSynchronize = &Command{ weed filer.remote.sync -dir=/mount/s3_on_cloud + The metadata sync starting time is determined with the following priority order: + 1. specified by timeAgo + 2. last sync timestamp for this directory + 3. directory creation time + `, } diff --git a/weed/command/s3.go b/weed/command/s3.go index 19f70bdce..d7cd7818d 100644 --- a/weed/command/s3.go +++ b/weed/command/s3.go @@ -25,6 +25,7 @@ var ( type S3Options struct { filer *string + bindIp *string port *int config *string domainName *string @@ -38,6 +39,7 @@ type S3Options struct { func init() { cmdS3.Run = runS3 // break init cycle s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address") + s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to") s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port") s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}") s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file") @@ -189,7 +191,7 @@ func (s3opt *S3Options) startS3Server() bool { httpS := &http.Server{Handler: router} - listenAddress := fmt.Sprintf(":%d", *s3opt.port) + listenAddress := fmt.Sprintf("%s:%d", *s3opt.bindIp, *s3opt.port) s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second) if err != nil { glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err) @@ -197,6 +199,9 @@ func (s3opt *S3Options) startS3Server() bool { if len(*s3opt.auditLogConfig) > 0 { s3err.InitAuditLog(*s3opt.auditLogConfig) + if s3err.Logger != nil { + defer s3err.Logger.Close() + } } if *s3opt.tlsPrivateKey != "" { diff --git a/weed/command/server.go b/weed/command/server.go index 1a6393edf..0cb748381 100644 --- a/weed/command/server.go +++ b/weed/command/server.go @@ -180,6 +180,7 @@ func runServer(cmd *Command, args []string) bool { filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses() filerOptions.ip = serverIp filerOptions.bindIp = serverBindIp + s3Options.bindIp = serverBindIp serverOptions.v.ip = serverIp serverOptions.v.bindIp = serverBindIp serverOptions.v.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses() diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go index 32008271b..b6a64b30d 100644 --- a/weed/filer/filechunk_manifest.go +++ b/weed/filer/filechunk_manifest.go @@ -101,6 +101,15 @@ func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0) } +func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) { + urlStrings, err := lookupFileIdFn(fileId) + if err != nil { + glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err) + return nil, err + } + return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size) +} + func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) { var err error diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go index 458cf88be..5f58b870c 100644 --- a/weed/filer/reader_at.go +++ b/weed/filer/reader_at.go @@ -26,6 +26,7 @@ type ChunkReadAt struct { chunkCache chunk_cache.ChunkCache lastChunkFileId string lastChunkData []byte + readerPattern *ReaderPattern } var _ = io.ReaderAt(&ChunkReadAt{}) @@ -88,10 +89,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt { return &ChunkReadAt{ - chunkViews: chunkViews, - lookupFileId: lookupFn, - chunkCache: chunkCache, - fileSize: fileSize, + chunkViews: chunkViews, + lookupFileId: lookupFn, + chunkCache: chunkCache, + fileSize: fileSize, + readerPattern: NewReaderPattern(), } } @@ -106,6 +108,8 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) { c.readerLock.Lock() defer c.readerLock.Unlock() + c.readerPattern.MonitorReadAt(offset, len(p)) + // glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews)) return c.doReadAt(p, offset) } @@ -171,7 +175,14 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) { func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) { - chunkSlice := c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length) + if c.readerPattern.IsRandomMode() { + return c.doFetchRangeChunkData(chunkView, offset, length) + } + + var chunkSlice []byte + if chunkView.LogicOffset == 0 { + chunkSlice = c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length) + } if len(chunkSlice) > 0 { return chunkSlice, nil } @@ -217,7 +228,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize) - data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + var data []byte + if chunkView.LogicOffset == 0 { + data = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize) + } if data != nil { glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data))) } else { @@ -226,7 +240,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro if err != nil { return data, err } - c.chunkCache.SetChunk(chunkView.FileId, data) + if chunkView.LogicOffset == 0 { + // only cache the first chunk + c.chunkCache.SetChunk(chunkView.FileId, data) + } } return data, err }) @@ -243,3 +260,15 @@ func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error) return data, err } + +func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) { + + glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId) + + data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length)) + + glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId) + + return data, err + +} diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go new file mode 100644 index 000000000..2bf18d141 --- /dev/null +++ b/weed/filer/reader_pattern.go @@ -0,0 +1,31 @@ +package filer + +type ReaderPattern struct { + isStreaming bool + lastReadOffset int64 +} + +// For streaming read: only cache the first chunk +// For random read: only fetch the requested range, instead of the whole chunk + +func NewReaderPattern() *ReaderPattern { + return &ReaderPattern{ + isStreaming: true, + lastReadOffset: 0, + } +} + +func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) { + if rp.lastReadOffset > offset { + rp.isStreaming = false + } + rp.lastReadOffset = offset +} + +func (rp *ReaderPattern) IsStreamingMode() bool { + return rp.isStreaming +} + +func (rp *ReaderPattern) IsRandomMode() bool { + return !rp.isStreaming +} diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go index 9a791e013..cedcf2d76 100644 --- a/weed/filesys/dir.go +++ b/weed/filesys/dir.go @@ -161,7 +161,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest, }, } file.dirtyMetadata = true - fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0) + fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid) return file, fh, nil } diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go index b7514a2eb..88b50ce41 100644 --- a/weed/filesys/dirty_pages_continuous.go +++ b/weed/filesys/dirty_pages_continuous.go @@ -3,6 +3,7 @@ package filesys import ( "bytes" "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" "io" "sync" "time" @@ -12,9 +13,8 @@ import ( ) type ContinuousDirtyPages struct { - intervals *ContinuousIntervals + intervals *page_writer.ContinuousIntervals f *File - writeOnly bool writeWaitGroup sync.WaitGroup chunkAddLock sync.Mutex lastErr error @@ -22,11 +22,10 @@ type ContinuousDirtyPages struct { replication string } -func newContinuousDirtyPages(file *File, writeOnly bool) *ContinuousDirtyPages { +func newContinuousDirtyPages(file *File) *ContinuousDirtyPages { dirtyPages := &ContinuousDirtyPages{ - intervals: &ContinuousIntervals{}, + intervals: &page_writer.ContinuousIntervals{}, f: file, - writeOnly: writeOnly, } return dirtyPages } @@ -107,7 +106,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64, defer pages.writeWaitGroup.Done() reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) if err != nil { glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) pages.lastErr = err @@ -148,13 +147,3 @@ func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int6 func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) { return pages.collection, pages.replication } - -func (pages *ContinuousDirtyPages) SetWriteOnly(writeOnly bool) { - if pages.writeOnly { - pages.writeOnly = writeOnly - } -} - -func (pages *ContinuousDirtyPages) GetWriteOnly() (writeOnly bool) { - return pages.writeOnly -} diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go index 9fa7c0c8e..6a22889dc 100644 --- a/weed/filesys/dirty_pages_temp_file.go +++ b/weed/filesys/dirty_pages_temp_file.go @@ -2,6 +2,7 @@ package filesys import ( "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" "github.com/chrislusf/seaweedfs/weed/glog" "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "io" @@ -13,8 +14,7 @@ import ( type TempFileDirtyPages struct { f *File tf *os.File - writtenIntervals *WrittenContinuousIntervals - writeOnly bool + writtenIntervals *page_writer.WrittenContinuousIntervals writeWaitGroup sync.WaitGroup pageAddLock sync.Mutex chunkAddLock sync.Mutex @@ -23,12 +23,11 @@ type TempFileDirtyPages struct { replication string } -func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages { +func newTempFileDirtyPages(file *File) *TempFileDirtyPages { tempFile := &TempFileDirtyPages{ f: file, - writeOnly: writeOnly, - writtenIntervals: &WrittenContinuousIntervals{}, + writtenIntervals: &page_writer.WrittenContinuousIntervals{}, } return tempFile @@ -47,11 +46,11 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { return } pages.tf = tf - pages.writtenIntervals.tempFile = tf - pages.writtenIntervals.lastOffset = 0 + pages.writtenIntervals.TempFile = tf + pages.writtenIntervals.LastOffset = 0 } - writtenOffset := pages.writtenIntervals.lastOffset + writtenOffset := pages.writtenIntervals.LastOffset dataSize := int64(len(data)) // glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize) @@ -60,7 +59,7 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) { pages.lastErr = err } else { pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset) - pages.writtenIntervals.lastOffset += dataSize + pages.writtenIntervals.LastOffset += dataSize } // pages.writtenIntervals.debug() @@ -79,8 +78,8 @@ func (pages *TempFileDirtyPages) FlushData() error { defer pages.pageAddLock.Unlock() if pages.tf != nil { - pages.writtenIntervals.tempFile = nil - pages.writtenIntervals.lists = nil + pages.writtenIntervals.TempFile = nil + pages.writtenIntervals.Lists = nil pages.tf.Close() os.Remove(pages.tf.Name()) @@ -95,7 +94,7 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() { // glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists)) - for _, list := range pages.writtenIntervals.lists { + for _, list := range pages.writtenIntervals.Lists { listStopOffset := list.Offset() + list.Size() for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize { start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize) @@ -117,7 +116,7 @@ func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, s defer pages.writeWaitGroup.Done() reader = io.LimitReader(reader, size) - chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset) + chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset) if err != nil { glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err) pages.lastErr = err @@ -145,13 +144,3 @@ func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64) func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) { return pages.collection, pages.replication } - -func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) { - if pages.writeOnly { - pages.writeOnly = writeOnly - } -} - -func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) { - return pages.writeOnly -} diff --git a/weed/filesys/file.go b/weed/filesys/file.go index f8fd7ad99..767841f9d 100644 --- a/weed/filesys/file.go +++ b/weed/filesys/file.go @@ -97,7 +97,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op glog.V(4).Infof("file %v open %+v", file.fullpath(), req) - handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0) + handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid) resp.Handle = fuse.HandleID(handle.handle) diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go index 34affddb9..d92b17b5b 100644 --- a/weed/filesys/filehandle.go +++ b/weed/filesys/filehandle.go @@ -3,6 +3,7 @@ package filesys import ( "context" "fmt" + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" "io" "math" "net/http" @@ -20,7 +21,7 @@ import ( type FileHandle struct { // cache file has been written to - dirtyPages DirtyPages + dirtyPages page_writer.DirtyPages entryViewCache []filer.VisibleInterval reader io.ReaderAt contentType string @@ -36,11 +37,11 @@ type FileHandle struct { isDeleted bool } -func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle { +func newFileHandle(file *File, uid, gid uint32) *FileHandle { fh := &FileHandle{ f: file, // dirtyPages: newContinuousDirtyPages(file, writeOnly), - dirtyPages: newTempFileDirtyPages(file, writeOnly), + dirtyPages: newPageWriter(file, 2*1024*1024), Uid: uid, Gid: gid, } @@ -62,10 +63,11 @@ var _ = fs.HandleReleaser(&FileHandle{}) func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { - glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) fh.Lock() defer fh.Unlock() + glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data)) + if req.Size <= 0 { return nil } @@ -173,7 +175,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f // write the request to volume servers data := req.Data - if len(data) <= 512 { + if len(data) <= 512 && req.Offset == 0 { // fuse message cacheable size data = make([]byte, len(req.Data)) copy(data, req.Data) @@ -303,7 +305,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error { manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks) chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks) - chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath(), fh.dirtyPages.GetWriteOnly()), chunks) + chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks) if manifestErr != nil { // not good, but should be ok glog.V(0).Infof("MaybeManifestize: %v", manifestErr) diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go new file mode 100644 index 000000000..9c9e38968 --- /dev/null +++ b/weed/filesys/page_writer.go @@ -0,0 +1,78 @@ +package filesys + +import ( + "github.com/chrislusf/seaweedfs/weed/filesys/page_writer" + "github.com/chrislusf/seaweedfs/weed/glog" +) + +type PageWriter struct { + f *File + collection string + replication string + chunkSize int64 + writerPattern *WriterPattern + + randomWriter page_writer.DirtyPages + streamWriter page_writer.DirtyPages +} + +var ( + _ = page_writer.DirtyPages(&PageWriter{}) +) + +func newPageWriter(file *File, chunkSize int64) *PageWriter { + pw := &PageWriter{ + f: file, + chunkSize: chunkSize, + randomWriter: newTempFileDirtyPages(file), + streamWriter: newContinuousDirtyPages(file), + writerPattern: NewWriterPattern(file.Name, chunkSize), + } + return pw +} + +func (pw *PageWriter) AddPage(offset int64, data []byte) { + + glog.V(4).Infof("AddPage %v [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode()) + + pw.writerPattern.MonitorWriteAt(offset, len(data)) + + chunkIndex := offset / pw.chunkSize + for i := chunkIndex; len(data) > 0; i++ { + writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) + pw.addToOneChunk(i, offset, data[:writeSize]) + offset += writeSize + data = data[writeSize:] + } +} + +func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) { + if chunkIndex > 0 { + if pw.writerPattern.IsStreamingMode() { + pw.streamWriter.AddPage(offset, data) + return + } + } + pw.randomWriter.AddPage(offset, data) +} + +func (pw *PageWriter) FlushData() error { + if err := pw.streamWriter.FlushData(); err != nil { + return err + } + return pw.randomWriter.FlushData() +} + +func (pw *PageWriter) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) { + glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), startOffset, startOffset+int64(len(data))) + m1 := pw.streamWriter.ReadDirtyDataAt(data, startOffset) + m2 := pw.randomWriter.ReadDirtyDataAt(data, startOffset) + return max(m1, m2) +} + +func (pw *PageWriter) GetStorageOptions() (collection, replication string) { + if pw.writerPattern.IsStreamingMode() { + return pw.streamWriter.GetStorageOptions() + } + return pw.randomWriter.GetStorageOptions() +} diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/page_writer/dirty_page_interval.go index 304793340..6d73b8cd7 100644 --- a/weed/filesys/dirty_page_interval.go +++ b/weed/filesys/page_writer/dirty_page_interval.go @@ -1,4 +1,4 @@ -package filesys +package page_writer import ( "io" diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/page_writer/dirty_page_interval_test.go index d02ad27fd..2a2a1df4d 100644 --- a/weed/filesys/dirty_page_interval_test.go +++ b/weed/filesys/page_writer/dirty_page_interval_test.go @@ -1,4 +1,4 @@ -package filesys +package page_writer import ( "bytes" diff --git a/weed/filesys/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go index 8505323ef..c18f847b7 100644 --- a/weed/filesys/dirty_pages.go +++ b/weed/filesys/page_writer/dirty_pages.go @@ -1,10 +1,8 @@ -package filesys +package page_writer type DirtyPages interface { AddPage(offset int64, data []byte) FlushData() error ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) GetStorageOptions() (collection, replication string) - SetWriteOnly(writeOnly bool) - GetWriteOnly() (writeOnly bool) } diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/page_writer/dirty_pages_temp_interval.go index 42c4b5a3b..aeaf0ec6f 100644 --- a/weed/filesys/dirty_pages_temp_interval.go +++ b/weed/filesys/page_writer/dirty_pages_temp_interval.go @@ -1,4 +1,4 @@ -package filesys +package page_writer import ( "io" @@ -20,9 +20,9 @@ type WrittenIntervalLinkedList struct { } type WrittenContinuousIntervals struct { - tempFile *os.File - lastOffset int64 - lists []*WrittenIntervalLinkedList + TempFile *os.File + LastOffset int64 + Lists []*WrittenIntervalLinkedList } func (list *WrittenIntervalLinkedList) Offset() int64 { @@ -65,7 +65,7 @@ func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) { } func (c *WrittenContinuousIntervals) TotalSize() (total int64) { - for _, list := range c.lists { + for _, list := range c.Lists { total += list.Size() } return @@ -98,7 +98,7 @@ func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenInterv func (c *WrittenContinuousIntervals) debug() { log.Printf("++") - for _, l := range c.lists { + for _, l := range c.Lists { log.Printf("++++") for t := l.Head; ; t = t.Next { log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size) @@ -116,8 +116,8 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)} // append to the tail and return - if len(c.lists) == 1 { - lastSpan := c.lists[0] + if len(c.Lists) == 1 { + lastSpan := c.Lists[0] if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset { lastSpan.addNodeToTail(interval) return @@ -125,7 +125,7 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, } var newLists []*WrittenIntervalLinkedList - for _, list := range c.lists { + for _, list := range c.Lists { // if list is to the left of new interval, add to the new list if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset { newLists = append(newLists, list) @@ -147,18 +147,18 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, // skip anything that is fully overwritten by the new interval } - c.lists = newLists + c.Lists = newLists // add the new interval to the lists, connecting neighbor lists var prevList, nextList *WrittenIntervalLinkedList - for _, list := range c.lists { + for _, list := range c.Lists { if list.Head.DataOffset == interval.DataOffset+interval.Size { nextList = list break } } - for _, list := range c.lists { + for _, list := range c.Lists { if list.Head.DataOffset+list.Size() == dataOffset { list.addNodeToTail(interval) prevList = list @@ -176,8 +176,8 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, nextList.addNodeToHead(interval) } if prevList == nil && nextList == nil { - c.lists = append(c.lists, &WrittenIntervalLinkedList{ - tempFile: c.tempFile, + c.Lists = append(c.Lists, &WrittenIntervalLinkedList{ + tempFile: c.TempFile, Head: interval, Tail: interval, }) @@ -189,7 +189,7 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int, func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList { var maxSize int64 maxIndex := -1 - for k, list := range c.lists { + for k, list := range c.Lists { if maxSize <= list.Size() { maxSize = list.Size() maxIndex = k @@ -199,16 +199,16 @@ func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenI return nil } - t := c.lists[maxIndex] - t.tempFile = c.tempFile - c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...) + t := c.Lists[maxIndex] + t.tempFile = c.TempFile + c.Lists = append(c.Lists[0:maxIndex], c.Lists[maxIndex+1:]...) return t } func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) { index := -1 - for k, list := range c.lists { + for k, list := range c.Lists { if list.Offset() == target.Offset() { index = k } @@ -217,12 +217,12 @@ func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedLis return } - c.lists = append(c.lists[0:index], c.lists[index+1:]...) + c.Lists = append(c.Lists[0:index], c.Lists[index+1:]...) } func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) { - for _, list := range c.lists { + for _, list := range c.Lists { start := max(startOffset, list.Offset()) stop := min(startOffset+int64(len(data)), list.Offset()+list.Size()) if start < stop { @@ -287,3 +287,16 @@ func (f *FileSectionReader) Read(p []byte) (n int, err error) { } return } + +func max(x, y int64) int64 { + if x > y { + return x + } + return y +} +func min(x, y int64) int64 { + if x < y { + return x + } + return y +} diff --git a/weed/filesys/page_writer/page_chunk.go b/weed/filesys/page_writer/page_chunk.go new file mode 100644 index 000000000..b21e6acca --- /dev/null +++ b/weed/filesys/page_writer/page_chunk.go @@ -0,0 +1 @@ +package page_writer diff --git a/weed/filesys/page_writer_pattern.go b/weed/filesys/page_writer_pattern.go new file mode 100644 index 000000000..42ca3d969 --- /dev/null +++ b/weed/filesys/page_writer_pattern.go @@ -0,0 +1,38 @@ +package filesys + +type WriterPattern struct { + isStreaming bool + lastWriteOffset int64 + chunkSize int64 + fileName string +} + +// For streaming write: only cache the first chunk +// For random write: fall back to temp file approach +// writes can only change from streaming mode to non-streaming mode + +func NewWriterPattern(fileName string, chunkSize int64) *WriterPattern { + return &WriterPattern{ + isStreaming: true, + lastWriteOffset: 0, + chunkSize: chunkSize, + fileName: fileName, + } +} + +func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) { + if rp.lastWriteOffset == 0 { + } + if rp.lastWriteOffset > offset { + rp.isStreaming = false + } + rp.lastWriteOffset = offset +} + +func (rp *WriterPattern) IsStreamingMode() bool { + return rp.isStreaming +} + +func (rp *WriterPattern) IsRandomMode() bool { + return !rp.isStreaming +} diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go index 92f6bae38..aa4f9dacd 100644 --- a/weed/filesys/wfs.go +++ b/weed/filesys/wfs.go @@ -148,7 +148,7 @@ func (wfs *WFS) Root() (fs.Node, error) { return wfs.root, nil } -func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) { +func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) { fullpath := file.fullpath() glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid) @@ -160,7 +160,6 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (file if found && existingHandle != nil && existingHandle.f.isOpen > 0 { existingHandle.f.isOpen++ wfs.handlesLock.Unlock() - existingHandle.dirtyPages.SetWriteOnly(writeOnly) glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen) return existingHandle } @@ -168,7 +167,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (file entry, _ := file.maybeLoadEntry(context.Background()) file.entry = entry - fileHandle = newFileHandle(file, uid, gid, writeOnly) + fileHandle = newFileHandle(file, uid, gid) wfs.handlesLock.Lock() file.isOpen++ diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go index 3d08cb5e2..61a463e88 100644 --- a/weed/filesys/wfs_write.go +++ b/weed/filesys/wfs_write.go @@ -13,7 +13,7 @@ import ( "github.com/chrislusf/seaweedfs/weed/util" ) -func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.SaveDataAsChunkFunctionType { +func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType { return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) { var fileId, host string @@ -74,7 +74,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error) } - if !writeOnly { + if offset == 0 { wfs.chunkCache.SetChunk(fileId, data) } diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index a73db81ec..0d46ad7ca 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -203,33 +203,44 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action) var identity *Identity var s3Err s3err.ErrorCode var found bool + var authType string switch getRequestAuthType(r) { case authTypeStreamingSigned: return identity, s3err.ErrNone case authTypeUnknown: glog.V(3).Infof("unknown auth type") + r.Header.Set(xhttp.AmzAuthType, "Unknown") return identity, s3err.ErrAccessDenied case authTypePresignedV2, authTypeSignedV2: glog.V(3).Infof("v2 auth type") identity, s3Err = iam.isReqAuthenticatedV2(r) + authType = "SigV2" case authTypeSigned, authTypePresigned: glog.V(3).Infof("v4 auth type") identity, s3Err = iam.reqSignatureV4Verify(r) + authType = "SigV4" case authTypePostPolicy: glog.V(3).Infof("post policy auth type") + r.Header.Set(xhttp.AmzAuthType, "PostPolicy") return identity, s3err.ErrNone case authTypeJWT: glog.V(3).Infof("jwt auth type") + r.Header.Set(xhttp.AmzAuthType, "Jwt") return identity, s3err.ErrNotImplemented case authTypeAnonymous: + authType = "Anonymous" identity, found = iam.lookupAnonymous() if !found { + r.Header.Set(xhttp.AmzAuthType, authType) return identity, s3err.ErrAccessDenied } default: return identity, s3err.ErrNotImplemented } + if len(authType) > 0 { + r.Header.Set(xhttp.AmzAuthType, authType) + } if s3Err != s3err.ErrNone { return identity, s3Err } @@ -250,33 +261,45 @@ func (iam *IdentityAccessManagement) authUser(r *http.Request) (*Identity, s3err var identity *Identity var s3Err s3err.ErrorCode var found bool + var authType string switch getRequestAuthType(r) { case authTypeStreamingSigned: return identity, s3err.ErrNone case authTypeUnknown: glog.V(3).Infof("unknown auth type") + r.Header.Set(xhttp.AmzAuthType, "Unknown") return identity, s3err.ErrAccessDenied case authTypePresignedV2, authTypeSignedV2: glog.V(3).Infof("v2 auth type") identity, s3Err = iam.isReqAuthenticatedV2(r) + authType = "SigV2" case authTypeSigned, authTypePresigned: glog.V(3).Infof("v4 auth type") identity, s3Err = iam.reqSignatureV4Verify(r) + authType = "SigV4" case authTypePostPolicy: glog.V(3).Infof("post policy auth type") + r.Header.Set(xhttp.AmzAuthType, "PostPolicy") return identity, s3err.ErrNone case authTypeJWT: glog.V(3).Infof("jwt auth type") + r.Header.Set(xhttp.AmzAuthType, "Jwt") return identity, s3err.ErrNotImplemented case authTypeAnonymous: + authType = "Anonymous" identity, found = iam.lookupAnonymous() if !found { + r.Header.Set(xhttp.AmzAuthType, authType) return identity, s3err.ErrAccessDenied } default: return identity, s3err.ErrNotImplemented } + if len(authType) > 0 { + r.Header.Set(xhttp.AmzAuthType, authType) + } + glog.V(3).Infof("auth error: %v", s3Err) if s3Err != s3err.ErrNone { return identity, s3Err diff --git a/weed/s3api/http/header.go b/weed/s3api/http/header.go index 7579cf312..d63d50443 100644 --- a/weed/s3api/http/header.go +++ b/weed/s3api/http/header.go @@ -38,6 +38,7 @@ const ( // Non-Standard S3 HTTP request constants const ( AmzIdentityId = "s3-identity-id" + AmzAuthType = "s3-auth-type" AmzIsAdmin = "s3-is-admin" // only set to http request header as a context ) @@ -51,3 +52,12 @@ func GetBucketAndObject(r *http.Request) (bucket, object string) { return } + +var PassThroughHeaders = map[string]string{ + "response-cache-control": "Cache-Control", + "response-content-disposition": "Content-Disposition", + "response-content-encoding": "Content-Encoding", + "response-content-language": "Content-Language", + "response-content-type": "Content-Type", + "response-expires": "Expires", +} diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go index 6d1ec303e..2ac9c8102 100644 --- a/weed/s3api/s3api_object_handlers.go +++ b/weed/s3api/s3api_object_handlers.go @@ -261,7 +261,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h } if auditLog != nil { auditLog.Key = entryName - s3err.PostAccessLog(auditLog) + s3err.PostAccessLog(*auditLog) } } @@ -306,15 +306,6 @@ func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerCli return } -var passThroughHeaders = []string{ - "response-cache-control", - "response-content-disposition", - "response-content-encoding", - "response-content-language", - "response-content-type", - "response-expires", -} - func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int)) { glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl) @@ -328,25 +319,14 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des } proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr) - - for header, values := range r.Header { - // handle s3 related headers - passed := false - for _, h := range passThroughHeaders { - if strings.ToLower(header) == h && len(values) > 0 { - proxyReq.Header.Add(header[len("response-"):], values[0]) - passed = true - break - } - } - if passed { - continue - } - // handle other headers - for _, value := range values { - proxyReq.Header.Add(header, value) + for k, v := range r.URL.Query() { + if _, ok := xhttp.PassThroughHeaders[strings.ToLower(k)]; ok { + proxyReq.Header[k] = v } } + for header, values := range r.Header { + proxyReq.Header[header] = values + } resp, postErr := client.Do(proxyReq) diff --git a/weed/s3api/s3api_status_handlers.go b/weed/s3api/s3api_status_handlers.go index 2ee6c37b2..fafb6ac2f 100644 --- a/weed/s3api/s3api_status_handlers.go +++ b/weed/s3api/s3api_status_handlers.go @@ -1,8 +1,11 @@ package s3api -import "net/http" +import ( + "github.com/chrislusf/seaweedfs/weed/s3api/s3err" + "net/http" +) func (s3a *S3ApiServer) StatusHandler(w http.ResponseWriter, r *http.Request) { // write out the response code and content type header - writeSuccessResponseEmpty(w, r) + s3err.WriteResponse(w, r, http.StatusOK, []byte{}, "") } diff --git a/weed/s3api/s3err/audit_fluent.go b/weed/s3api/s3err/audit_fluent.go index bf935c8d8..fcc5f9a0f 100644 --- a/weed/s3api/s3err/audit_fluent.go +++ b/weed/s3api/s3err/audit_fluent.go @@ -48,23 +48,35 @@ type AccessLogHTTP struct { const tag = "s3.access" var ( - Logger *fluent.Fluent - hostname = os.Getenv("HOSTNAME") + Logger *fluent.Fluent + hostname = os.Getenv("HOSTNAME") + environment = os.Getenv("ENVIRONMENT") ) func InitAuditLog(config string) { configContent, readErr := os.ReadFile(config) if readErr != nil { - glog.Fatalf("fail to read fluent config %s : %v", config, readErr) + glog.Errorf("fail to read fluent config %s : %v", config, readErr) + return + } + fluentConfig := &fluent.Config{} + if err := json.Unmarshal(configContent, fluentConfig); err != nil { + glog.Errorf("fail to parse fluent config %s : %v", string(configContent), err) + return } - var fluentConfig fluent.Config - if err := json.Unmarshal(configContent, &fluentConfig); err != nil { - glog.Fatalf("fail to parse fluent config %s : %v", config, err) + if len(fluentConfig.TagPrefix) == 0 && len(environment) > 0 { + fluentConfig.TagPrefix = environment + } + fluentConfig.Async = true + fluentConfig.AsyncResultCallback = func(data []byte, err error) { + if err != nil { + glog.Warning("Error while posting log: ", err) + } } var err error - Logger, err = fluent.New(fluentConfig) + Logger, err = fluent.New(*fluentConfig) if err != nil { - glog.Fatalf("fail to load fluent config: %v", err) + glog.Errorf("fail to load fluent config: %v", err) } } @@ -131,23 +143,24 @@ func GetAccessLog(r *http.Request, HTTPStatusCode int, s3errCode ErrorCode) *Acc if len(remoteIP) == 0 { remoteIP = r.RemoteAddr } - hostHeader := r.Header.Get("Host") + hostHeader := r.Header.Get("X-Forwarded-Host") if len(hostHeader) == 0 { - hostHeader = r.URL.Hostname() + hostHeader = r.Host } return &AccessLog{ - HostHeader: hostHeader, - RequestID: r.Header.Get("X-Request-ID"), - RemoteIP: remoteIP, - Requester: r.Header.Get(xhttp.AmzIdentityId), - UserAgent: r.Header.Get("UserAgent"), - HostId: hostname, - Bucket: bucket, - HTTPStatus: HTTPStatusCode, - Time: time.Now().Unix(), - Key: key, - Operation: getOperation(key, r), - ErrorCode: errorCode, + HostHeader: hostHeader, + RequestID: r.Header.Get("X-Request-ID"), + RemoteIP: remoteIP, + Requester: r.Header.Get(xhttp.AmzIdentityId), + SignatureVersion: r.Header.Get(xhttp.AmzAuthType), + UserAgent: r.Header.Get("user-agent"), + HostId: hostname, + Bucket: bucket, + HTTPStatus: HTTPStatusCode, + Time: time.Now().Unix(), + Key: key, + Operation: getOperation(key, r), + ErrorCode: errorCode, } } @@ -160,11 +173,11 @@ func PostLog(r *http.Request, HTTPStatusCode int, errorCode ErrorCode) { } } -func PostAccessLog(log *AccessLog) { - if Logger == nil || log == nil { +func PostAccessLog(log AccessLog) { + if Logger == nil || len(log.Key) == 0 { return } - if err := Logger.Post(tag, *log); err != nil { + if err := Logger.Post(tag, log); err != nil { glog.Warning("Error while posting log: ", err) } } diff --git a/weed/server/common.go b/weed/server/common.go index 16213689d..ba4d13456 100644 --- a/weed/server/common.go +++ b/weed/server/common.go @@ -5,6 +5,7 @@ import ( "encoding/json" "errors" "fmt" + xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http" "io" "io/fs" "mime/multipart" @@ -250,13 +251,16 @@ func handleStaticResources2(r *mux.Router) { r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(http.FS(StaticFS)))) } -func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) { - responseContentDisposition := r.FormValue("response-content-disposition") - if responseContentDisposition != "" { - w.Header().Set("Content-Disposition", responseContentDisposition) - return +func adjustPassthroughHeaders(w http.ResponseWriter, r *http.Request, filename string) { + for header, values := range r.Header { + if normalizedHeader, ok := xhttp.PassThroughHeaders[strings.ToLower(header)]; ok { + w.Header()[normalizedHeader] = values + } } - if w.Header().Get("Content-Disposition") != "" { + adjustHeaderContentDisposition(w, r, filename) +} +func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) { + if contentDisposition := w.Header().Get("Content-Disposition"); contentDisposition != "" { return } if filename != "" { diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go index 86e4af586..ac6aea056 100644 --- a/weed/server/filer_server_handlers_read.go +++ b/weed/server/filer_server_handlers_read.go @@ -130,7 +130,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) setEtag(w, etag) filename := entry.Name() - adjustHeaderContentDisposition(w, r, filename) + adjustPassthroughHeaders(w, r, filename) totalSize := int64(entry.Size()) diff --git a/weed/server/master_server.go b/weed/server/master_server.go index f897ffe7d..cbc0aa337 100644 --- a/weed/server/master_server.go +++ b/weed/server/master_server.go @@ -213,17 +213,7 @@ func (ms *MasterServer) startAdminScripts() { v := util.GetViper() adminScripts := v.GetString("master.maintenance.scripts") if adminScripts == "" { - adminScripts = ` - lock - ec.encode -fullPercent=95 -quietFor=1h - ec.rebuild -force - ec.balance -force - volume.deleteEmpty -quietFor=24h -force - volume.balance -force - volume.fix.replication - s3.clean.uploads -timeAgo=24h - unlock - ` + return } glog.V(0).Infof("adminScripts: %v", adminScripts) diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go index 5d12108d3..5ce2278bf 100644 --- a/weed/server/volume_server_handlers_read.go +++ b/weed/server/volume_server_handlers_read.go @@ -29,8 +29,6 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`) func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) { - glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range")) - stats.VolumeServerRequestCounter.WithLabelValues("get").Inc() start := time.Now() defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }() @@ -301,7 +299,7 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re } w.Header().Set("Accept-Ranges", "bytes") - adjustHeaderContentDisposition(w, r, filename) + adjustPassthroughHeaders(w, r, filename) if r.Method == "HEAD" { w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10)) diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go index 98042f3c8..af4ec1eb4 100644 --- a/weed/storage/disk_location.go +++ b/weed/storage/disk_location.go @@ -334,12 +334,12 @@ func (l *DiskLocation) Close() { } func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) { - println("LocateVolume", vid, "on", l.Directory) + // println("LocateVolume", vid, "on", l.Directory) if dirEntries, err := os.ReadDir(l.Directory); err == nil { for _, entry := range dirEntries { - println("checking", entry.Name(), "...") + // println("checking", entry.Name(), "...") volId, _, err := volumeIdFromFileName(entry.Name()) - println("volId", volId, "err", err) + // println("volId", volId, "err", err) if vid == volId && err == nil { return entry, true } diff --git a/weed/topology/topology.go b/weed/topology/topology.go index ad440e244..207c89ad7 100644 --- a/weed/topology/topology.go +++ b/weed/topology/topology.go @@ -206,7 +206,7 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { vl.EnsureCorrectWritables(&v) } func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) { - glog.Infof("removing volume info: %+v", v) + glog.Infof("removing volume info: %+v from %v", v, dn.id) diskType := types.ToDiskType(v.DiskType) volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType) volumeLayout.UnRegisterVolume(&v, dn) diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go index a70e9aec9..dbfb439bd 100644 --- a/weed/topology/volume_layout.go +++ b/weed/topology/volume_layout.go @@ -281,7 +281,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n //glog.V(0).Infoln("No more writable volumes!") return nil, 0, nil, errors.New("No more writable volumes!") } - if option.DataCenter == "" { + if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" { vid := vl.writables[rand.Intn(lenWriters)] locationList := vl.vid2location[vid] if locationList != nil { @@ -295,17 +295,18 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n for _, v := range vl.writables { volumeLocationList := vl.vid2location[v] for _, dn := range volumeLocationList.list { - if dn.GetDataCenter().Id() == NodeId(option.DataCenter) { - if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { - continue - } - if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { - continue - } - counter++ - if rand.Intn(counter) < 1 { - vid, locationList = v, volumeLocationList.Copy() - } + if option.DataCenter != "" && dn.GetDataCenter().Id() != NodeId(option.DataCenter) { + continue + } + if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) { + continue + } + if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) { + continue + } + counter++ + if rand.Intn(counter) < 1 { + vid, locationList = v, volumeLocationList.Copy() } } } |
