aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/filer.go1
-rw-r--r--weed/command/filer_remote_gateway_buckets.go3
-rw-r--r--weed/command/filer_remote_sync.go7
-rw-r--r--weed/command/s3.go7
-rw-r--r--weed/command/server.go1
-rw-r--r--weed/filer/filechunk_manifest.go9
-rw-r--r--weed/filer/reader_at.go43
-rw-r--r--weed/filer/reader_pattern.go31
-rw-r--r--weed/filesys/dir.go2
-rw-r--r--weed/filesys/dirty_pages_continuous.go21
-rw-r--r--weed/filesys/dirty_pages_temp_file.go35
-rw-r--r--weed/filesys/file.go2
-rw-r--r--weed/filesys/filehandle.go14
-rw-r--r--weed/filesys/page_writer.go78
-rw-r--r--weed/filesys/page_writer/dirty_page_interval.go (renamed from weed/filesys/dirty_page_interval.go)2
-rw-r--r--weed/filesys/page_writer/dirty_page_interval_test.go (renamed from weed/filesys/dirty_page_interval_test.go)2
-rw-r--r--weed/filesys/page_writer/dirty_pages.go (renamed from weed/filesys/dirty_pages.go)4
-rw-r--r--weed/filesys/page_writer/dirty_pages_temp_interval.go (renamed from weed/filesys/dirty_pages_temp_interval.go)55
-rw-r--r--weed/filesys/page_writer/page_chunk.go1
-rw-r--r--weed/filesys/page_writer_pattern.go38
-rw-r--r--weed/filesys/wfs.go5
-rw-r--r--weed/filesys/wfs_write.go4
-rw-r--r--weed/s3api/auth_credentials.go23
-rw-r--r--weed/s3api/http/header.go10
-rw-r--r--weed/s3api/s3api_object_handlers.go34
-rw-r--r--weed/s3api/s3api_status_handlers.go7
-rw-r--r--weed/s3api/s3err/audit_fluent.go63
-rw-r--r--weed/server/common.go16
-rw-r--r--weed/server/filer_server_handlers_read.go2
-rw-r--r--weed/server/master_server.go12
-rw-r--r--weed/server/volume_server_handlers_read.go4
-rw-r--r--weed/storage/disk_location.go6
-rw-r--r--weed/topology/topology.go2
-rw-r--r--weed/topology/volume_layout.go25
34 files changed, 391 insertions, 178 deletions
diff --git a/weed/command/filer.go b/weed/command/filer.go
index c5d538bfe..876b1bbf0 100644
--- a/weed/command/filer.go
+++ b/weed/command/filer.go
@@ -138,6 +138,7 @@ func runFiler(cmd *Command, args []string) bool {
startDelay := time.Duration(2)
if *filerStartS3 {
filerS3Options.filer = &filerAddress
+ filerS3Options.bindIp = f.bindIp
go func() {
time.Sleep(startDelay * time.Second)
filerS3Options.startS3Server()
diff --git a/weed/command/filer_remote_gateway_buckets.go b/weed/command/filer_remote_gateway_buckets.go
index af851e2b9..fc11cdbc5 100644
--- a/weed/command/filer_remote_gateway_buckets.go
+++ b/weed/command/filer_remote_gateway_buckets.go
@@ -181,6 +181,9 @@ func (option *RemoteGatewayOptions) makeBucketedEventProcessor(filerSource *sour
if message.NewParentPath == option.bucketsDir {
return handleCreateBucket(message.NewEntry)
}
+ if strings.HasPrefix(message.NewParentPath, option.bucketsDir) && strings.Contains(message.NewParentPath, "/.uploads/") {
+ return nil
+ }
if !filer.HasData(message.NewEntry) {
return nil
}
diff --git a/weed/command/filer_remote_sync.go b/weed/command/filer_remote_sync.go
index c55544925..bceeb097e 100644
--- a/weed/command/filer_remote_sync.go
+++ b/weed/command/filer_remote_sync.go
@@ -40,7 +40,7 @@ func init() {
remoteSyncOptions.filerAddress = cmdFilerRemoteSynchronize.Flag.String("filer", "localhost:8888", "filer of the SeaweedFS cluster")
remoteSyncOptions.dir = cmdFilerRemoteSynchronize.Flag.String("dir", "", "a mounted directory on filer")
remoteSyncOptions.readChunkFromFiler = cmdFilerRemoteSynchronize.Flag.Bool("filerProxy", false, "read file chunks from filer instead of volume servers")
- remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
+ remoteSyncOptions.timeAgo = cmdFilerRemoteSynchronize.Flag.Duration("timeAgo", 0, "start time before now, skipping previous metadata changes. \"300ms\", \"1.5h\" or \"2h45m\". Valid time units are \"ns\", \"us\" (or \"µs\"), \"ms\", \"s\", \"m\", \"h\"")
}
var cmdFilerRemoteSynchronize = &Command{
@@ -54,6 +54,11 @@ var cmdFilerRemoteSynchronize = &Command{
weed filer.remote.sync -dir=/mount/s3_on_cloud
+ The metadata sync starting time is determined with the following priority order:
+ 1. specified by timeAgo
+ 2. last sync timestamp for this directory
+ 3. directory creation time
+
`,
}
diff --git a/weed/command/s3.go b/weed/command/s3.go
index 19f70bdce..d7cd7818d 100644
--- a/weed/command/s3.go
+++ b/weed/command/s3.go
@@ -25,6 +25,7 @@ var (
type S3Options struct {
filer *string
+ bindIp *string
port *int
config *string
domainName *string
@@ -38,6 +39,7 @@ type S3Options struct {
func init() {
cmdS3.Run = runS3 // break init cycle
s3StandaloneOptions.filer = cmdS3.Flag.String("filer", "localhost:8888", "filer server address")
+ s3StandaloneOptions.bindIp = cmdS3.Flag.String("ip.bind", "", "ip address to bind to")
s3StandaloneOptions.port = cmdS3.Flag.Int("port", 8333, "s3 server http listen port")
s3StandaloneOptions.domainName = cmdS3.Flag.String("domainName", "", "suffix of the host name in comma separated list, {bucket}.{domainName}")
s3StandaloneOptions.config = cmdS3.Flag.String("config", "", "path to the config file")
@@ -189,7 +191,7 @@ func (s3opt *S3Options) startS3Server() bool {
httpS := &http.Server{Handler: router}
- listenAddress := fmt.Sprintf(":%d", *s3opt.port)
+ listenAddress := fmt.Sprintf("%s:%d", *s3opt.bindIp, *s3opt.port)
s3ApiListener, err := util.NewListener(listenAddress, time.Duration(10)*time.Second)
if err != nil {
glog.Fatalf("S3 API Server listener on %s error: %v", listenAddress, err)
@@ -197,6 +199,9 @@ func (s3opt *S3Options) startS3Server() bool {
if len(*s3opt.auditLogConfig) > 0 {
s3err.InitAuditLog(*s3opt.auditLogConfig)
+ if s3err.Logger != nil {
+ defer s3err.Logger.Close()
+ }
}
if *s3opt.tlsPrivateKey != "" {
diff --git a/weed/command/server.go b/weed/command/server.go
index 1a6393edf..0cb748381 100644
--- a/weed/command/server.go
+++ b/weed/command/server.go
@@ -180,6 +180,7 @@ func runServer(cmd *Command, args []string) bool {
filerOptions.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses()
filerOptions.ip = serverIp
filerOptions.bindIp = serverBindIp
+ s3Options.bindIp = serverBindIp
serverOptions.v.ip = serverIp
serverOptions.v.bindIp = serverBindIp
serverOptions.v.masters = pb.ServerAddresses(*masterOptions.peers).ToAddresses()
diff --git a/weed/filer/filechunk_manifest.go b/weed/filer/filechunk_manifest.go
index 32008271b..b6a64b30d 100644
--- a/weed/filer/filechunk_manifest.go
+++ b/weed/filer/filechunk_manifest.go
@@ -101,6 +101,15 @@ func fetchChunk(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string,
return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, true, 0, 0)
}
+func fetchChunkRange(lookupFileIdFn wdclient.LookupFileIdFunctionType, fileId string, cipherKey []byte, isGzipped bool, offset int64, size int) ([]byte, error) {
+ urlStrings, err := lookupFileIdFn(fileId)
+ if err != nil {
+ glog.Errorf("operation LookupFileId %s failed, err: %v", fileId, err)
+ return nil, err
+ }
+ return retriedFetchChunkData(urlStrings, cipherKey, isGzipped, false, offset, size)
+}
+
func retriedFetchChunkData(urlStrings []string, cipherKey []byte, isGzipped bool, isFullChunk bool, offset int64, size int) ([]byte, error) {
var err error
diff --git a/weed/filer/reader_at.go b/weed/filer/reader_at.go
index 458cf88be..5f58b870c 100644
--- a/weed/filer/reader_at.go
+++ b/weed/filer/reader_at.go
@@ -26,6 +26,7 @@ type ChunkReadAt struct {
chunkCache chunk_cache.ChunkCache
lastChunkFileId string
lastChunkData []byte
+ readerPattern *ReaderPattern
}
var _ = io.ReaderAt(&ChunkReadAt{})
@@ -88,10 +89,11 @@ func LookupFn(filerClient filer_pb.FilerClient) wdclient.LookupFileIdFunctionTyp
func NewChunkReaderAtFromClient(lookupFn wdclient.LookupFileIdFunctionType, chunkViews []*ChunkView, chunkCache chunk_cache.ChunkCache, fileSize int64) *ChunkReadAt {
return &ChunkReadAt{
- chunkViews: chunkViews,
- lookupFileId: lookupFn,
- chunkCache: chunkCache,
- fileSize: fileSize,
+ chunkViews: chunkViews,
+ lookupFileId: lookupFn,
+ chunkCache: chunkCache,
+ fileSize: fileSize,
+ readerPattern: NewReaderPattern(),
}
}
@@ -106,6 +108,8 @@ func (c *ChunkReadAt) ReadAt(p []byte, offset int64) (n int, err error) {
c.readerLock.Lock()
defer c.readerLock.Unlock()
+ c.readerPattern.MonitorReadAt(offset, len(p))
+
// glog.V(4).Infof("ReadAt [%d,%d) of total file size %d bytes %d chunk views", offset, offset+int64(len(p)), c.fileSize, len(c.chunkViews))
return c.doReadAt(p, offset)
}
@@ -171,7 +175,14 @@ func (c *ChunkReadAt) doReadAt(p []byte, offset int64) (n int, err error) {
func (c *ChunkReadAt) readChunkSlice(chunkView *ChunkView, nextChunkViews *ChunkView, offset, length uint64) ([]byte, error) {
- chunkSlice := c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
+ if c.readerPattern.IsRandomMode() {
+ return c.doFetchRangeChunkData(chunkView, offset, length)
+ }
+
+ var chunkSlice []byte
+ if chunkView.LogicOffset == 0 {
+ chunkSlice = c.chunkCache.GetChunkSlice(chunkView.FileId, offset, length)
+ }
if len(chunkSlice) > 0 {
return chunkSlice, nil
}
@@ -217,7 +228,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro
glog.V(4).Infof("readFromWholeChunkData %s offset %d [%d,%d) size at least %d", chunkView.FileId, chunkView.Offset, chunkView.LogicOffset, chunkView.LogicOffset+int64(chunkView.Size), chunkView.ChunkSize)
- data := c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ var data []byte
+ if chunkView.LogicOffset == 0 {
+ data = c.chunkCache.GetChunk(chunkView.FileId, chunkView.ChunkSize)
+ }
if data != nil {
glog.V(4).Infof("cache hit %s [%d,%d)", chunkView.FileId, chunkView.LogicOffset-chunkView.Offset, chunkView.LogicOffset-chunkView.Offset+int64(len(data)))
} else {
@@ -226,7 +240,10 @@ func (c *ChunkReadAt) readOneWholeChunk(chunkView *ChunkView) (interface{}, erro
if err != nil {
return data, err
}
- c.chunkCache.SetChunk(chunkView.FileId, data)
+ if chunkView.LogicOffset == 0 {
+ // only cache the first chunk
+ c.chunkCache.SetChunk(chunkView.FileId, data)
+ }
}
return data, err
})
@@ -243,3 +260,15 @@ func (c *ChunkReadAt) doFetchFullChunkData(chunkView *ChunkView) ([]byte, error)
return data, err
}
+
+func (c *ChunkReadAt) doFetchRangeChunkData(chunkView *ChunkView, offset, length uint64) ([]byte, error) {
+
+ glog.V(4).Infof("+ doFetchFullChunkData %s", chunkView.FileId)
+
+ data, err := fetchChunkRange(c.lookupFileId, chunkView.FileId, chunkView.CipherKey, chunkView.IsGzipped, int64(offset), int(length))
+
+ glog.V(4).Infof("- doFetchFullChunkData %s", chunkView.FileId)
+
+ return data, err
+
+}
diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go
new file mode 100644
index 000000000..2bf18d141
--- /dev/null
+++ b/weed/filer/reader_pattern.go
@@ -0,0 +1,31 @@
+package filer
+
+type ReaderPattern struct {
+ isStreaming bool
+ lastReadOffset int64
+}
+
+// For streaming read: only cache the first chunk
+// For random read: only fetch the requested range, instead of the whole chunk
+
+func NewReaderPattern() *ReaderPattern {
+ return &ReaderPattern{
+ isStreaming: true,
+ lastReadOffset: 0,
+ }
+}
+
+func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) {
+ if rp.lastReadOffset > offset {
+ rp.isStreaming = false
+ }
+ rp.lastReadOffset = offset
+}
+
+func (rp *ReaderPattern) IsStreamingMode() bool {
+ return rp.isStreaming
+}
+
+func (rp *ReaderPattern) IsRandomMode() bool {
+ return !rp.isStreaming
+}
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 9a791e013..cedcf2d76 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -161,7 +161,7 @@ func (dir *Dir) Create(ctx context.Context, req *fuse.CreateRequest,
},
}
file.dirtyMetadata = true
- fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0)
+ fh := dir.wfs.AcquireHandle(file, req.Uid, req.Gid)
return file, fh, nil
}
diff --git a/weed/filesys/dirty_pages_continuous.go b/weed/filesys/dirty_pages_continuous.go
index b7514a2eb..88b50ce41 100644
--- a/weed/filesys/dirty_pages_continuous.go
+++ b/weed/filesys/dirty_pages_continuous.go
@@ -3,6 +3,7 @@ package filesys
import (
"bytes"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"io"
"sync"
"time"
@@ -12,9 +13,8 @@ import (
)
type ContinuousDirtyPages struct {
- intervals *ContinuousIntervals
+ intervals *page_writer.ContinuousIntervals
f *File
- writeOnly bool
writeWaitGroup sync.WaitGroup
chunkAddLock sync.Mutex
lastErr error
@@ -22,11 +22,10 @@ type ContinuousDirtyPages struct {
replication string
}
-func newContinuousDirtyPages(file *File, writeOnly bool) *ContinuousDirtyPages {
+func newContinuousDirtyPages(file *File) *ContinuousDirtyPages {
dirtyPages := &ContinuousDirtyPages{
- intervals: &ContinuousIntervals{},
+ intervals: &page_writer.ContinuousIntervals{},
f: file,
- writeOnly: writeOnly,
}
return dirtyPages
}
@@ -107,7 +106,7 @@ func (pages *ContinuousDirtyPages) saveToStorage(reader io.Reader, offset int64,
defer pages.writeWaitGroup.Done()
reader = io.LimitReader(reader, size)
- chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
pages.lastErr = err
@@ -148,13 +147,3 @@ func (pages *ContinuousDirtyPages) ReadDirtyDataAt(data []byte, startOffset int6
func (pages *ContinuousDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
}
-
-func (pages *ContinuousDirtyPages) SetWriteOnly(writeOnly bool) {
- if pages.writeOnly {
- pages.writeOnly = writeOnly
- }
-}
-
-func (pages *ContinuousDirtyPages) GetWriteOnly() (writeOnly bool) {
- return pages.writeOnly
-}
diff --git a/weed/filesys/dirty_pages_temp_file.go b/weed/filesys/dirty_pages_temp_file.go
index 9fa7c0c8e..6a22889dc 100644
--- a/weed/filesys/dirty_pages_temp_file.go
+++ b/weed/filesys/dirty_pages_temp_file.go
@@ -2,6 +2,7 @@ package filesys
import (
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"github.com/chrislusf/seaweedfs/weed/glog"
"github.com/chrislusf/seaweedfs/weed/pb/filer_pb"
"io"
@@ -13,8 +14,7 @@ import (
type TempFileDirtyPages struct {
f *File
tf *os.File
- writtenIntervals *WrittenContinuousIntervals
- writeOnly bool
+ writtenIntervals *page_writer.WrittenContinuousIntervals
writeWaitGroup sync.WaitGroup
pageAddLock sync.Mutex
chunkAddLock sync.Mutex
@@ -23,12 +23,11 @@ type TempFileDirtyPages struct {
replication string
}
-func newTempFileDirtyPages(file *File, writeOnly bool) *TempFileDirtyPages {
+func newTempFileDirtyPages(file *File) *TempFileDirtyPages {
tempFile := &TempFileDirtyPages{
f: file,
- writeOnly: writeOnly,
- writtenIntervals: &WrittenContinuousIntervals{},
+ writtenIntervals: &page_writer.WrittenContinuousIntervals{},
}
return tempFile
@@ -47,11 +46,11 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
return
}
pages.tf = tf
- pages.writtenIntervals.tempFile = tf
- pages.writtenIntervals.lastOffset = 0
+ pages.writtenIntervals.TempFile = tf
+ pages.writtenIntervals.LastOffset = 0
}
- writtenOffset := pages.writtenIntervals.lastOffset
+ writtenOffset := pages.writtenIntervals.LastOffset
dataSize := int64(len(data))
// glog.V(4).Infof("%s AddPage %v at %d [%d,%d)", pages.f.fullpath(), pages.tf.Name(), writtenOffset, offset, offset+dataSize)
@@ -60,7 +59,7 @@ func (pages *TempFileDirtyPages) AddPage(offset int64, data []byte) {
pages.lastErr = err
} else {
pages.writtenIntervals.AddInterval(writtenOffset, len(data), offset)
- pages.writtenIntervals.lastOffset += dataSize
+ pages.writtenIntervals.LastOffset += dataSize
}
// pages.writtenIntervals.debug()
@@ -79,8 +78,8 @@ func (pages *TempFileDirtyPages) FlushData() error {
defer pages.pageAddLock.Unlock()
if pages.tf != nil {
- pages.writtenIntervals.tempFile = nil
- pages.writtenIntervals.lists = nil
+ pages.writtenIntervals.TempFile = nil
+ pages.writtenIntervals.Lists = nil
pages.tf.Close()
os.Remove(pages.tf.Name())
@@ -95,7 +94,7 @@ func (pages *TempFileDirtyPages) saveExistingPagesToStorage() {
// glog.V(4).Infof("%v saveExistingPagesToStorage %d lists", pages.f.Name, len(pages.writtenIntervals.lists))
- for _, list := range pages.writtenIntervals.lists {
+ for _, list := range pages.writtenIntervals.Lists {
listStopOffset := list.Offset() + list.Size()
for uploadedOffset := int64(0); uploadedOffset < listStopOffset; uploadedOffset += pageSize {
start, stop := max(list.Offset(), uploadedOffset), min(listStopOffset, uploadedOffset+pageSize)
@@ -117,7 +116,7 @@ func (pages *TempFileDirtyPages) saveToStorage(reader io.Reader, offset int64, s
defer pages.writeWaitGroup.Done()
reader = io.LimitReader(reader, size)
- chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath(), pages.writeOnly)(reader, pages.f.Name, offset)
+ chunk, collection, replication, err := pages.f.wfs.saveDataAsChunk(pages.f.fullpath())(reader, pages.f.Name, offset)
if err != nil {
glog.V(0).Infof("%s saveToStorage [%d,%d): %v", pages.f.fullpath(), offset, offset+size, err)
pages.lastErr = err
@@ -145,13 +144,3 @@ func (pages *TempFileDirtyPages) ReadDirtyDataAt(data []byte, startOffset int64)
func (pages *TempFileDirtyPages) GetStorageOptions() (collection, replication string) {
return pages.collection, pages.replication
}
-
-func (pages *TempFileDirtyPages) SetWriteOnly(writeOnly bool) {
- if pages.writeOnly {
- pages.writeOnly = writeOnly
- }
-}
-
-func (pages *TempFileDirtyPages) GetWriteOnly() (writeOnly bool) {
- return pages.writeOnly
-}
diff --git a/weed/filesys/file.go b/weed/filesys/file.go
index f8fd7ad99..767841f9d 100644
--- a/weed/filesys/file.go
+++ b/weed/filesys/file.go
@@ -97,7 +97,7 @@ func (file *File) Open(ctx context.Context, req *fuse.OpenRequest, resp *fuse.Op
glog.V(4).Infof("file %v open %+v", file.fullpath(), req)
- handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid, req.Flags&fuse.OpenWriteOnly > 0)
+ handle := file.wfs.AcquireHandle(file, req.Uid, req.Gid)
resp.Handle = fuse.HandleID(handle.handle)
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 34affddb9..d92b17b5b 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -3,6 +3,7 @@ package filesys
import (
"context"
"fmt"
+ "github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
"io"
"math"
"net/http"
@@ -20,7 +21,7 @@ import (
type FileHandle struct {
// cache file has been written to
- dirtyPages DirtyPages
+ dirtyPages page_writer.DirtyPages
entryViewCache []filer.VisibleInterval
reader io.ReaderAt
contentType string
@@ -36,11 +37,11 @@ type FileHandle struct {
isDeleted bool
}
-func newFileHandle(file *File, uid, gid uint32, writeOnly bool) *FileHandle {
+func newFileHandle(file *File, uid, gid uint32) *FileHandle {
fh := &FileHandle{
f: file,
// dirtyPages: newContinuousDirtyPages(file, writeOnly),
- dirtyPages: newTempFileDirtyPages(file, writeOnly),
+ dirtyPages: newPageWriter(file, 2*1024*1024),
Uid: uid,
Gid: gid,
}
@@ -62,10 +63,11 @@ var _ = fs.HandleReleaser(&FileHandle{})
func (fh *FileHandle) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
- glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
fh.Lock()
defer fh.Unlock()
+ glog.V(4).Infof("%s read fh %d: [%d,%d) size %d resp.Data cap=%d", fh.f.fullpath(), fh.handle, req.Offset, req.Offset+int64(req.Size), req.Size, cap(resp.Data))
+
if req.Size <= 0 {
return nil
}
@@ -173,7 +175,7 @@ func (fh *FileHandle) Write(ctx context.Context, req *fuse.WriteRequest, resp *f
// write the request to volume servers
data := req.Data
- if len(data) <= 512 {
+ if len(data) <= 512 && req.Offset == 0 {
// fuse message cacheable size
data = make([]byte, len(req.Data))
copy(data, req.Data)
@@ -303,7 +305,7 @@ func (fh *FileHandle) doFlush(ctx context.Context, header fuse.Header) error {
manifestChunks, nonManifestChunks := filer.SeparateManifestChunks(entry.Chunks)
chunks, _ := filer.CompactFileChunks(fh.f.wfs.LookupFn(), nonManifestChunks)
- chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath(), fh.dirtyPages.GetWriteOnly()), chunks)
+ chunks, manifestErr := filer.MaybeManifestize(fh.f.wfs.saveDataAsChunk(fh.f.fullpath()), chunks)
if manifestErr != nil {
// not good, but should be ok
glog.V(0).Infof("MaybeManifestize: %v", manifestErr)
diff --git a/weed/filesys/page_writer.go b/weed/filesys/page_writer.go
new file mode 100644
index 000000000..9c9e38968
--- /dev/null
+++ b/weed/filesys/page_writer.go
@@ -0,0 +1,78 @@
+package filesys
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/filesys/page_writer"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+)
+
+type PageWriter struct {
+ f *File
+ collection string
+ replication string
+ chunkSize int64
+ writerPattern *WriterPattern
+
+ randomWriter page_writer.DirtyPages
+ streamWriter page_writer.DirtyPages
+}
+
+var (
+ _ = page_writer.DirtyPages(&PageWriter{})
+)
+
+func newPageWriter(file *File, chunkSize int64) *PageWriter {
+ pw := &PageWriter{
+ f: file,
+ chunkSize: chunkSize,
+ randomWriter: newTempFileDirtyPages(file),
+ streamWriter: newContinuousDirtyPages(file),
+ writerPattern: NewWriterPattern(file.Name, chunkSize),
+ }
+ return pw
+}
+
+func (pw *PageWriter) AddPage(offset int64, data []byte) {
+
+ glog.V(4).Infof("AddPage %v [%d, %d) streaming:%v", pw.f.fullpath(), offset, offset+int64(len(data)), pw.writerPattern.IsStreamingMode())
+
+ pw.writerPattern.MonitorWriteAt(offset, len(data))
+
+ chunkIndex := offset / pw.chunkSize
+ for i := chunkIndex; len(data) > 0; i++ {
+ writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset)
+ pw.addToOneChunk(i, offset, data[:writeSize])
+ offset += writeSize
+ data = data[writeSize:]
+ }
+}
+
+func (pw *PageWriter) addToOneChunk(chunkIndex, offset int64, data []byte) {
+ if chunkIndex > 0 {
+ if pw.writerPattern.IsStreamingMode() {
+ pw.streamWriter.AddPage(offset, data)
+ return
+ }
+ }
+ pw.randomWriter.AddPage(offset, data)
+}
+
+func (pw *PageWriter) FlushData() error {
+ if err := pw.streamWriter.FlushData(); err != nil {
+ return err
+ }
+ return pw.randomWriter.FlushData()
+}
+
+func (pw *PageWriter) ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64) {
+ glog.V(4).Infof("ReadDirtyDataAt %v [%d, %d)", pw.f.fullpath(), startOffset, startOffset+int64(len(data)))
+ m1 := pw.streamWriter.ReadDirtyDataAt(data, startOffset)
+ m2 := pw.randomWriter.ReadDirtyDataAt(data, startOffset)
+ return max(m1, m2)
+}
+
+func (pw *PageWriter) GetStorageOptions() (collection, replication string) {
+ if pw.writerPattern.IsStreamingMode() {
+ return pw.streamWriter.GetStorageOptions()
+ }
+ return pw.randomWriter.GetStorageOptions()
+}
diff --git a/weed/filesys/dirty_page_interval.go b/weed/filesys/page_writer/dirty_page_interval.go
index 304793340..6d73b8cd7 100644
--- a/weed/filesys/dirty_page_interval.go
+++ b/weed/filesys/page_writer/dirty_page_interval.go
@@ -1,4 +1,4 @@
-package filesys
+package page_writer
import (
"io"
diff --git a/weed/filesys/dirty_page_interval_test.go b/weed/filesys/page_writer/dirty_page_interval_test.go
index d02ad27fd..2a2a1df4d 100644
--- a/weed/filesys/dirty_page_interval_test.go
+++ b/weed/filesys/page_writer/dirty_page_interval_test.go
@@ -1,4 +1,4 @@
-package filesys
+package page_writer
import (
"bytes"
diff --git a/weed/filesys/dirty_pages.go b/weed/filesys/page_writer/dirty_pages.go
index 8505323ef..c18f847b7 100644
--- a/weed/filesys/dirty_pages.go
+++ b/weed/filesys/page_writer/dirty_pages.go
@@ -1,10 +1,8 @@
-package filesys
+package page_writer
type DirtyPages interface {
AddPage(offset int64, data []byte)
FlushData() error
ReadDirtyDataAt(data []byte, startOffset int64) (maxStop int64)
GetStorageOptions() (collection, replication string)
- SetWriteOnly(writeOnly bool)
- GetWriteOnly() (writeOnly bool)
}
diff --git a/weed/filesys/dirty_pages_temp_interval.go b/weed/filesys/page_writer/dirty_pages_temp_interval.go
index 42c4b5a3b..aeaf0ec6f 100644
--- a/weed/filesys/dirty_pages_temp_interval.go
+++ b/weed/filesys/page_writer/dirty_pages_temp_interval.go
@@ -1,4 +1,4 @@
-package filesys
+package page_writer
import (
"io"
@@ -20,9 +20,9 @@ type WrittenIntervalLinkedList struct {
}
type WrittenContinuousIntervals struct {
- tempFile *os.File
- lastOffset int64
- lists []*WrittenIntervalLinkedList
+ TempFile *os.File
+ LastOffset int64
+ Lists []*WrittenIntervalLinkedList
}
func (list *WrittenIntervalLinkedList) Offset() int64 {
@@ -65,7 +65,7 @@ func (list *WrittenIntervalLinkedList) ReadData(buf []byte, start, stop int64) {
}
func (c *WrittenContinuousIntervals) TotalSize() (total int64) {
- for _, list := range c.lists {
+ for _, list := range c.Lists {
total += list.Size()
}
return
@@ -98,7 +98,7 @@ func (list *WrittenIntervalLinkedList) subList(start, stop int64) *WrittenInterv
func (c *WrittenContinuousIntervals) debug() {
log.Printf("++")
- for _, l := range c.lists {
+ for _, l := range c.Lists {
log.Printf("++++")
for t := l.Head; ; t = t.Next {
log.Printf("[%d,%d) => [%d,%d) %d", t.DataOffset, t.DataOffset+t.Size, t.TempOffset, t.TempOffset+t.Size, t.Size)
@@ -116,8 +116,8 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
interval := &WrittenIntervalNode{DataOffset: dataOffset, TempOffset: tempOffset, Size: int64(dataSize)}
// append to the tail and return
- if len(c.lists) == 1 {
- lastSpan := c.lists[0]
+ if len(c.Lists) == 1 {
+ lastSpan := c.Lists[0]
if lastSpan.Tail.DataOffset+lastSpan.Tail.Size == dataOffset {
lastSpan.addNodeToTail(interval)
return
@@ -125,7 +125,7 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
}
var newLists []*WrittenIntervalLinkedList
- for _, list := range c.lists {
+ for _, list := range c.Lists {
// if list is to the left of new interval, add to the new list
if list.Tail.DataOffset+list.Tail.Size <= interval.DataOffset {
newLists = append(newLists, list)
@@ -147,18 +147,18 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
// skip anything that is fully overwritten by the new interval
}
- c.lists = newLists
+ c.Lists = newLists
// add the new interval to the lists, connecting neighbor lists
var prevList, nextList *WrittenIntervalLinkedList
- for _, list := range c.lists {
+ for _, list := range c.Lists {
if list.Head.DataOffset == interval.DataOffset+interval.Size {
nextList = list
break
}
}
- for _, list := range c.lists {
+ for _, list := range c.Lists {
if list.Head.DataOffset+list.Size() == dataOffset {
list.addNodeToTail(interval)
prevList = list
@@ -176,8 +176,8 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
nextList.addNodeToHead(interval)
}
if prevList == nil && nextList == nil {
- c.lists = append(c.lists, &WrittenIntervalLinkedList{
- tempFile: c.tempFile,
+ c.Lists = append(c.Lists, &WrittenIntervalLinkedList{
+ tempFile: c.TempFile,
Head: interval,
Tail: interval,
})
@@ -189,7 +189,7 @@ func (c *WrittenContinuousIntervals) AddInterval(tempOffset int64, dataSize int,
func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenIntervalLinkedList {
var maxSize int64
maxIndex := -1
- for k, list := range c.lists {
+ for k, list := range c.Lists {
if maxSize <= list.Size() {
maxSize = list.Size()
maxIndex = k
@@ -199,16 +199,16 @@ func (c *WrittenContinuousIntervals) RemoveLargestIntervalLinkedList() *WrittenI
return nil
}
- t := c.lists[maxIndex]
- t.tempFile = c.tempFile
- c.lists = append(c.lists[0:maxIndex], c.lists[maxIndex+1:]...)
+ t := c.Lists[maxIndex]
+ t.tempFile = c.TempFile
+ c.Lists = append(c.Lists[0:maxIndex], c.Lists[maxIndex+1:]...)
return t
}
func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedList) {
index := -1
- for k, list := range c.lists {
+ for k, list := range c.Lists {
if list.Offset() == target.Offset() {
index = k
}
@@ -217,12 +217,12 @@ func (c *WrittenContinuousIntervals) removeList(target *WrittenIntervalLinkedLis
return
}
- c.lists = append(c.lists[0:index], c.lists[index+1:]...)
+ c.Lists = append(c.Lists[0:index], c.Lists[index+1:]...)
}
func (c *WrittenContinuousIntervals) ReadDataAt(data []byte, startOffset int64) (maxStop int64) {
- for _, list := range c.lists {
+ for _, list := range c.Lists {
start := max(startOffset, list.Offset())
stop := min(startOffset+int64(len(data)), list.Offset()+list.Size())
if start < stop {
@@ -287,3 +287,16 @@ func (f *FileSectionReader) Read(p []byte) (n int, err error) {
}
return
}
+
+func max(x, y int64) int64 {
+ if x > y {
+ return x
+ }
+ return y
+}
+func min(x, y int64) int64 {
+ if x < y {
+ return x
+ }
+ return y
+}
diff --git a/weed/filesys/page_writer/page_chunk.go b/weed/filesys/page_writer/page_chunk.go
new file mode 100644
index 000000000..b21e6acca
--- /dev/null
+++ b/weed/filesys/page_writer/page_chunk.go
@@ -0,0 +1 @@
+package page_writer
diff --git a/weed/filesys/page_writer_pattern.go b/weed/filesys/page_writer_pattern.go
new file mode 100644
index 000000000..42ca3d969
--- /dev/null
+++ b/weed/filesys/page_writer_pattern.go
@@ -0,0 +1,38 @@
+package filesys
+
+type WriterPattern struct {
+ isStreaming bool
+ lastWriteOffset int64
+ chunkSize int64
+ fileName string
+}
+
+// For streaming write: only cache the first chunk
+// For random write: fall back to temp file approach
+// writes can only change from streaming mode to non-streaming mode
+
+func NewWriterPattern(fileName string, chunkSize int64) *WriterPattern {
+ return &WriterPattern{
+ isStreaming: true,
+ lastWriteOffset: 0,
+ chunkSize: chunkSize,
+ fileName: fileName,
+ }
+}
+
+func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) {
+ if rp.lastWriteOffset == 0 {
+ }
+ if rp.lastWriteOffset > offset {
+ rp.isStreaming = false
+ }
+ rp.lastWriteOffset = offset
+}
+
+func (rp *WriterPattern) IsStreamingMode() bool {
+ return rp.isStreaming
+}
+
+func (rp *WriterPattern) IsRandomMode() bool {
+ return !rp.isStreaming
+}
diff --git a/weed/filesys/wfs.go b/weed/filesys/wfs.go
index 92f6bae38..aa4f9dacd 100644
--- a/weed/filesys/wfs.go
+++ b/weed/filesys/wfs.go
@@ -148,7 +148,7 @@ func (wfs *WFS) Root() (fs.Node, error) {
return wfs.root, nil
}
-func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (fileHandle *FileHandle) {
+func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32) (fileHandle *FileHandle) {
fullpath := file.fullpath()
glog.V(4).Infof("AcquireHandle %s uid=%d gid=%d", fullpath, uid, gid)
@@ -160,7 +160,6 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (file
if found && existingHandle != nil && existingHandle.f.isOpen > 0 {
existingHandle.f.isOpen++
wfs.handlesLock.Unlock()
- existingHandle.dirtyPages.SetWriteOnly(writeOnly)
glog.V(4).Infof("Reuse AcquiredHandle %s open %d", fullpath, existingHandle.f.isOpen)
return existingHandle
}
@@ -168,7 +167,7 @@ func (wfs *WFS) AcquireHandle(file *File, uid, gid uint32, writeOnly bool) (file
entry, _ := file.maybeLoadEntry(context.Background())
file.entry = entry
- fileHandle = newFileHandle(file, uid, gid, writeOnly)
+ fileHandle = newFileHandle(file, uid, gid)
wfs.handlesLock.Lock()
file.isOpen++
diff --git a/weed/filesys/wfs_write.go b/weed/filesys/wfs_write.go
index 3d08cb5e2..61a463e88 100644
--- a/weed/filesys/wfs_write.go
+++ b/weed/filesys/wfs_write.go
@@ -13,7 +13,7 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.SaveDataAsChunkFunctionType {
+func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath) filer.SaveDataAsChunkFunctionType {
return func(reader io.Reader, filename string, offset int64) (chunk *filer_pb.FileChunk, collection, replication string, err error) {
var fileId, host string
@@ -74,7 +74,7 @@ func (wfs *WFS) saveDataAsChunk(fullPath util.FullPath, writeOnly bool) filer.Sa
return nil, "", "", fmt.Errorf("upload result: %v", uploadResult.Error)
}
- if !writeOnly {
+ if offset == 0 {
wfs.chunkCache.SetChunk(fileId, data)
}
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index a73db81ec..0d46ad7ca 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -203,33 +203,44 @@ func (iam *IdentityAccessManagement) authRequest(r *http.Request, action Action)
var identity *Identity
var s3Err s3err.ErrorCode
var found bool
+ var authType string
switch getRequestAuthType(r) {
case authTypeStreamingSigned:
return identity, s3err.ErrNone
case authTypeUnknown:
glog.V(3).Infof("unknown auth type")
+ r.Header.Set(xhttp.AmzAuthType, "Unknown")
return identity, s3err.ErrAccessDenied
case authTypePresignedV2, authTypeSignedV2:
glog.V(3).Infof("v2 auth type")
identity, s3Err = iam.isReqAuthenticatedV2(r)
+ authType = "SigV2"
case authTypeSigned, authTypePresigned:
glog.V(3).Infof("v4 auth type")
identity, s3Err = iam.reqSignatureV4Verify(r)
+ authType = "SigV4"
case authTypePostPolicy:
glog.V(3).Infof("post policy auth type")
+ r.Header.Set(xhttp.AmzAuthType, "PostPolicy")
return identity, s3err.ErrNone
case authTypeJWT:
glog.V(3).Infof("jwt auth type")
+ r.Header.Set(xhttp.AmzAuthType, "Jwt")
return identity, s3err.ErrNotImplemented
case authTypeAnonymous:
+ authType = "Anonymous"
identity, found = iam.lookupAnonymous()
if !found {
+ r.Header.Set(xhttp.AmzAuthType, authType)
return identity, s3err.ErrAccessDenied
}
default:
return identity, s3err.ErrNotImplemented
}
+ if len(authType) > 0 {
+ r.Header.Set(xhttp.AmzAuthType, authType)
+ }
if s3Err != s3err.ErrNone {
return identity, s3Err
}
@@ -250,33 +261,45 @@ func (iam *IdentityAccessManagement) authUser(r *http.Request) (*Identity, s3err
var identity *Identity
var s3Err s3err.ErrorCode
var found bool
+ var authType string
switch getRequestAuthType(r) {
case authTypeStreamingSigned:
return identity, s3err.ErrNone
case authTypeUnknown:
glog.V(3).Infof("unknown auth type")
+ r.Header.Set(xhttp.AmzAuthType, "Unknown")
return identity, s3err.ErrAccessDenied
case authTypePresignedV2, authTypeSignedV2:
glog.V(3).Infof("v2 auth type")
identity, s3Err = iam.isReqAuthenticatedV2(r)
+ authType = "SigV2"
case authTypeSigned, authTypePresigned:
glog.V(3).Infof("v4 auth type")
identity, s3Err = iam.reqSignatureV4Verify(r)
+ authType = "SigV4"
case authTypePostPolicy:
glog.V(3).Infof("post policy auth type")
+ r.Header.Set(xhttp.AmzAuthType, "PostPolicy")
return identity, s3err.ErrNone
case authTypeJWT:
glog.V(3).Infof("jwt auth type")
+ r.Header.Set(xhttp.AmzAuthType, "Jwt")
return identity, s3err.ErrNotImplemented
case authTypeAnonymous:
+ authType = "Anonymous"
identity, found = iam.lookupAnonymous()
if !found {
+ r.Header.Set(xhttp.AmzAuthType, authType)
return identity, s3err.ErrAccessDenied
}
default:
return identity, s3err.ErrNotImplemented
}
+ if len(authType) > 0 {
+ r.Header.Set(xhttp.AmzAuthType, authType)
+ }
+
glog.V(3).Infof("auth error: %v", s3Err)
if s3Err != s3err.ErrNone {
return identity, s3Err
diff --git a/weed/s3api/http/header.go b/weed/s3api/http/header.go
index 7579cf312..d63d50443 100644
--- a/weed/s3api/http/header.go
+++ b/weed/s3api/http/header.go
@@ -38,6 +38,7 @@ const (
// Non-Standard S3 HTTP request constants
const (
AmzIdentityId = "s3-identity-id"
+ AmzAuthType = "s3-auth-type"
AmzIsAdmin = "s3-is-admin" // only set to http request header as a context
)
@@ -51,3 +52,12 @@ func GetBucketAndObject(r *http.Request) (bucket, object string) {
return
}
+
+var PassThroughHeaders = map[string]string{
+ "response-cache-control": "Cache-Control",
+ "response-content-disposition": "Content-Disposition",
+ "response-content-encoding": "Content-Encoding",
+ "response-content-language": "Content-Language",
+ "response-content-type": "Content-Type",
+ "response-expires": "Expires",
+}
diff --git a/weed/s3api/s3api_object_handlers.go b/weed/s3api/s3api_object_handlers.go
index 6d1ec303e..2ac9c8102 100644
--- a/weed/s3api/s3api_object_handlers.go
+++ b/weed/s3api/s3api_object_handlers.go
@@ -261,7 +261,7 @@ func (s3a *S3ApiServer) DeleteMultipleObjectsHandler(w http.ResponseWriter, r *h
}
if auditLog != nil {
auditLog.Key = entryName
- s3err.PostAccessLog(auditLog)
+ s3err.PostAccessLog(*auditLog)
}
}
@@ -306,15 +306,6 @@ func (s3a *S3ApiServer) doDeleteEmptyDirectories(client filer_pb.SeaweedFilerCli
return
}
-var passThroughHeaders = []string{
- "response-cache-control",
- "response-content-disposition",
- "response-content-encoding",
- "response-content-language",
- "response-content-type",
- "response-expires",
-}
-
func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, destUrl string, responseFn func(proxyResponse *http.Response, w http.ResponseWriter) (statusCode int)) {
glog.V(3).Infof("s3 proxying %s to %s", r.Method, destUrl)
@@ -328,25 +319,14 @@ func (s3a *S3ApiServer) proxyToFiler(w http.ResponseWriter, r *http.Request, des
}
proxyReq.Header.Set("X-Forwarded-For", r.RemoteAddr)
-
- for header, values := range r.Header {
- // handle s3 related headers
- passed := false
- for _, h := range passThroughHeaders {
- if strings.ToLower(header) == h && len(values) > 0 {
- proxyReq.Header.Add(header[len("response-"):], values[0])
- passed = true
- break
- }
- }
- if passed {
- continue
- }
- // handle other headers
- for _, value := range values {
- proxyReq.Header.Add(header, value)
+ for k, v := range r.URL.Query() {
+ if _, ok := xhttp.PassThroughHeaders[strings.ToLower(k)]; ok {
+ proxyReq.Header[k] = v
}
}
+ for header, values := range r.Header {
+ proxyReq.Header[header] = values
+ }
resp, postErr := client.Do(proxyReq)
diff --git a/weed/s3api/s3api_status_handlers.go b/weed/s3api/s3api_status_handlers.go
index 2ee6c37b2..fafb6ac2f 100644
--- a/weed/s3api/s3api_status_handlers.go
+++ b/weed/s3api/s3api_status_handlers.go
@@ -1,8 +1,11 @@
package s3api
-import "net/http"
+import (
+ "github.com/chrislusf/seaweedfs/weed/s3api/s3err"
+ "net/http"
+)
func (s3a *S3ApiServer) StatusHandler(w http.ResponseWriter, r *http.Request) {
// write out the response code and content type header
- writeSuccessResponseEmpty(w, r)
+ s3err.WriteResponse(w, r, http.StatusOK, []byte{}, "")
}
diff --git a/weed/s3api/s3err/audit_fluent.go b/weed/s3api/s3err/audit_fluent.go
index bf935c8d8..fcc5f9a0f 100644
--- a/weed/s3api/s3err/audit_fluent.go
+++ b/weed/s3api/s3err/audit_fluent.go
@@ -48,23 +48,35 @@ type AccessLogHTTP struct {
const tag = "s3.access"
var (
- Logger *fluent.Fluent
- hostname = os.Getenv("HOSTNAME")
+ Logger *fluent.Fluent
+ hostname = os.Getenv("HOSTNAME")
+ environment = os.Getenv("ENVIRONMENT")
)
func InitAuditLog(config string) {
configContent, readErr := os.ReadFile(config)
if readErr != nil {
- glog.Fatalf("fail to read fluent config %s : %v", config, readErr)
+ glog.Errorf("fail to read fluent config %s : %v", config, readErr)
+ return
+ }
+ fluentConfig := &fluent.Config{}
+ if err := json.Unmarshal(configContent, fluentConfig); err != nil {
+ glog.Errorf("fail to parse fluent config %s : %v", string(configContent), err)
+ return
}
- var fluentConfig fluent.Config
- if err := json.Unmarshal(configContent, &fluentConfig); err != nil {
- glog.Fatalf("fail to parse fluent config %s : %v", config, err)
+ if len(fluentConfig.TagPrefix) == 0 && len(environment) > 0 {
+ fluentConfig.TagPrefix = environment
+ }
+ fluentConfig.Async = true
+ fluentConfig.AsyncResultCallback = func(data []byte, err error) {
+ if err != nil {
+ glog.Warning("Error while posting log: ", err)
+ }
}
var err error
- Logger, err = fluent.New(fluentConfig)
+ Logger, err = fluent.New(*fluentConfig)
if err != nil {
- glog.Fatalf("fail to load fluent config: %v", err)
+ glog.Errorf("fail to load fluent config: %v", err)
}
}
@@ -131,23 +143,24 @@ func GetAccessLog(r *http.Request, HTTPStatusCode int, s3errCode ErrorCode) *Acc
if len(remoteIP) == 0 {
remoteIP = r.RemoteAddr
}
- hostHeader := r.Header.Get("Host")
+ hostHeader := r.Header.Get("X-Forwarded-Host")
if len(hostHeader) == 0 {
- hostHeader = r.URL.Hostname()
+ hostHeader = r.Host
}
return &AccessLog{
- HostHeader: hostHeader,
- RequestID: r.Header.Get("X-Request-ID"),
- RemoteIP: remoteIP,
- Requester: r.Header.Get(xhttp.AmzIdentityId),
- UserAgent: r.Header.Get("UserAgent"),
- HostId: hostname,
- Bucket: bucket,
- HTTPStatus: HTTPStatusCode,
- Time: time.Now().Unix(),
- Key: key,
- Operation: getOperation(key, r),
- ErrorCode: errorCode,
+ HostHeader: hostHeader,
+ RequestID: r.Header.Get("X-Request-ID"),
+ RemoteIP: remoteIP,
+ Requester: r.Header.Get(xhttp.AmzIdentityId),
+ SignatureVersion: r.Header.Get(xhttp.AmzAuthType),
+ UserAgent: r.Header.Get("user-agent"),
+ HostId: hostname,
+ Bucket: bucket,
+ HTTPStatus: HTTPStatusCode,
+ Time: time.Now().Unix(),
+ Key: key,
+ Operation: getOperation(key, r),
+ ErrorCode: errorCode,
}
}
@@ -160,11 +173,11 @@ func PostLog(r *http.Request, HTTPStatusCode int, errorCode ErrorCode) {
}
}
-func PostAccessLog(log *AccessLog) {
- if Logger == nil || log == nil {
+func PostAccessLog(log AccessLog) {
+ if Logger == nil || len(log.Key) == 0 {
return
}
- if err := Logger.Post(tag, *log); err != nil {
+ if err := Logger.Post(tag, log); err != nil {
glog.Warning("Error while posting log: ", err)
}
}
diff --git a/weed/server/common.go b/weed/server/common.go
index 16213689d..ba4d13456 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ xhttp "github.com/chrislusf/seaweedfs/weed/s3api/http"
"io"
"io/fs"
"mime/multipart"
@@ -250,13 +251,16 @@ func handleStaticResources2(r *mux.Router) {
r.PathPrefix("/seaweedfsstatic/").Handler(http.StripPrefix("/seaweedfsstatic", http.FileServer(http.FS(StaticFS))))
}
-func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) {
- responseContentDisposition := r.FormValue("response-content-disposition")
- if responseContentDisposition != "" {
- w.Header().Set("Content-Disposition", responseContentDisposition)
- return
+func adjustPassthroughHeaders(w http.ResponseWriter, r *http.Request, filename string) {
+ for header, values := range r.Header {
+ if normalizedHeader, ok := xhttp.PassThroughHeaders[strings.ToLower(header)]; ok {
+ w.Header()[normalizedHeader] = values
+ }
}
- if w.Header().Get("Content-Disposition") != "" {
+ adjustHeaderContentDisposition(w, r, filename)
+}
+func adjustHeaderContentDisposition(w http.ResponseWriter, r *http.Request, filename string) {
+ if contentDisposition := w.Header().Get("Content-Disposition"); contentDisposition != "" {
return
}
if filename != "" {
diff --git a/weed/server/filer_server_handlers_read.go b/weed/server/filer_server_handlers_read.go
index 86e4af586..ac6aea056 100644
--- a/weed/server/filer_server_handlers_read.go
+++ b/weed/server/filer_server_handlers_read.go
@@ -130,7 +130,7 @@ func (fs *FilerServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request)
setEtag(w, etag)
filename := entry.Name()
- adjustHeaderContentDisposition(w, r, filename)
+ adjustPassthroughHeaders(w, r, filename)
totalSize := int64(entry.Size())
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index f897ffe7d..cbc0aa337 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -213,17 +213,7 @@ func (ms *MasterServer) startAdminScripts() {
v := util.GetViper()
adminScripts := v.GetString("master.maintenance.scripts")
if adminScripts == "" {
- adminScripts = `
- lock
- ec.encode -fullPercent=95 -quietFor=1h
- ec.rebuild -force
- ec.balance -force
- volume.deleteEmpty -quietFor=24h -force
- volume.balance -force
- volume.fix.replication
- s3.clean.uploads -timeAgo=24h
- unlock
- `
+ return
}
glog.V(0).Infof("adminScripts: %v", adminScripts)
diff --git a/weed/server/volume_server_handlers_read.go b/weed/server/volume_server_handlers_read.go
index 5d12108d3..5ce2278bf 100644
--- a/weed/server/volume_server_handlers_read.go
+++ b/weed/server/volume_server_handlers_read.go
@@ -29,8 +29,6 @@ var fileNameEscaper = strings.NewReplacer(`\`, `\\`, `"`, `\"`)
func (vs *VolumeServer) GetOrHeadHandler(w http.ResponseWriter, r *http.Request) {
- glog.V(9).Info(r.Method + " " + r.URL.Path + " " + r.Header.Get("Range"))
-
stats.VolumeServerRequestCounter.WithLabelValues("get").Inc()
start := time.Now()
defer func() { stats.VolumeServerRequestHistogram.WithLabelValues("get").Observe(time.Since(start).Seconds()) }()
@@ -301,7 +299,7 @@ func writeResponseContent(filename, mimeType string, rs io.ReadSeeker, w http.Re
}
w.Header().Set("Accept-Ranges", "bytes")
- adjustHeaderContentDisposition(w, r, filename)
+ adjustPassthroughHeaders(w, r, filename)
if r.Method == "HEAD" {
w.Header().Set("Content-Length", strconv.FormatInt(totalSize, 10))
diff --git a/weed/storage/disk_location.go b/weed/storage/disk_location.go
index 98042f3c8..af4ec1eb4 100644
--- a/weed/storage/disk_location.go
+++ b/weed/storage/disk_location.go
@@ -334,12 +334,12 @@ func (l *DiskLocation) Close() {
}
func (l *DiskLocation) LocateVolume(vid needle.VolumeId) (os.DirEntry, bool) {
- println("LocateVolume", vid, "on", l.Directory)
+ // println("LocateVolume", vid, "on", l.Directory)
if dirEntries, err := os.ReadDir(l.Directory); err == nil {
for _, entry := range dirEntries {
- println("checking", entry.Name(), "...")
+ // println("checking", entry.Name(), "...")
volId, _, err := volumeIdFromFileName(entry.Name())
- println("volId", volId, "err", err)
+ // println("volId", volId, "err", err)
if vid == volId && err == nil {
return entry, true
}
diff --git a/weed/topology/topology.go b/weed/topology/topology.go
index ad440e244..207c89ad7 100644
--- a/weed/topology/topology.go
+++ b/weed/topology/topology.go
@@ -206,7 +206,7 @@ func (t *Topology) RegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
vl.EnsureCorrectWritables(&v)
}
func (t *Topology) UnRegisterVolumeLayout(v storage.VolumeInfo, dn *DataNode) {
- glog.Infof("removing volume info: %+v", v)
+ glog.Infof("removing volume info: %+v from %v", v, dn.id)
diskType := types.ToDiskType(v.DiskType)
volumeLayout := t.GetVolumeLayout(v.Collection, v.ReplicaPlacement, v.Ttl, diskType)
volumeLayout.UnRegisterVolume(&v, dn)
diff --git a/weed/topology/volume_layout.go b/weed/topology/volume_layout.go
index a70e9aec9..dbfb439bd 100644
--- a/weed/topology/volume_layout.go
+++ b/weed/topology/volume_layout.go
@@ -281,7 +281,7 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
//glog.V(0).Infoln("No more writable volumes!")
return nil, 0, nil, errors.New("No more writable volumes!")
}
- if option.DataCenter == "" {
+ if option.DataCenter == "" && option.Rack == "" && option.DataNode == "" {
vid := vl.writables[rand.Intn(lenWriters)]
locationList := vl.vid2location[vid]
if locationList != nil {
@@ -295,17 +295,18 @@ func (vl *VolumeLayout) PickForWrite(count uint64, option *VolumeGrowOption) (*n
for _, v := range vl.writables {
volumeLocationList := vl.vid2location[v]
for _, dn := range volumeLocationList.list {
- if dn.GetDataCenter().Id() == NodeId(option.DataCenter) {
- if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
- continue
- }
- if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
- continue
- }
- counter++
- if rand.Intn(counter) < 1 {
- vid, locationList = v, volumeLocationList.Copy()
- }
+ if option.DataCenter != "" && dn.GetDataCenter().Id() != NodeId(option.DataCenter) {
+ continue
+ }
+ if option.Rack != "" && dn.GetRack().Id() != NodeId(option.Rack) {
+ continue
+ }
+ if option.DataNode != "" && dn.Id() != NodeId(option.DataNode) {
+ continue
+ }
+ counter++
+ if rand.Intn(counter) < 1 {
+ vid, locationList = v, volumeLocationList.Copy()
}
}
}