diff options
| -rw-r--r-- | weed/filer/meta_aggregator.go | 6 | ||||
| -rw-r--r-- | weed/filer/reader_cache.go | 6 | ||||
| -rw-r--r-- | weed/filer/reader_pattern.go | 29 | ||||
| -rw-r--r-- | weed/filer/s3iam_conf.go | 20 | ||||
| -rw-r--r-- | weed/filer/s3iam_conf_test.go | 93 | ||||
| -rw-r--r-- | weed/mount/page_writer.go | 4 | ||||
| -rw-r--r-- | weed/mount/page_writer_pattern.go | 38 | ||||
| -rw-r--r-- | weed/mount/weedfs_file_write.go | 2 | ||||
| -rw-r--r-- | weed/s3api/auth_credentials.go | 5 | ||||
| -rw-r--r-- | weed/shell/command_s3_configure.go | 7 |
10 files changed, 154 insertions, 56 deletions
diff --git a/weed/filer/meta_aggregator.go b/weed/filer/meta_aggregator.go index 1a805bde3..11f0d9184 100644 --- a/weed/filer/meta_aggregator.go +++ b/weed/filer/meta_aggregator.go @@ -57,7 +57,7 @@ func (ma *MetaAggregator) OnPeerUpdate(update *master_pb.ClusterNodeUpdate, star if update.IsAdd { // every filer should subscribe to a new filer if ma.setActive(address, true) { - go ma.loopSubscribeToOnefiler(ma.filer, ma.self, address, startFrom) + go ma.loopSubscribeToOneFiler(ma.filer, ma.self, address, startFrom) } } else { ma.setActive(address, false) @@ -89,10 +89,10 @@ func (ma *MetaAggregator) isActive(address pb.ServerAddress) (isActive bool) { return count > 0 && isActive } -func (ma *MetaAggregator) loopSubscribeToOnefiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) { +func (ma *MetaAggregator) loopSubscribeToOneFiler(f *Filer, self pb.ServerAddress, peer pb.ServerAddress, startFrom time.Time) { lastTsNs := startFrom.UnixNano() for { - glog.V(0).Infof("loopSubscribeToOnefiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs) + glog.V(0).Infof("loopSubscribeToOneFiler read %s start from %v %d", peer, time.Unix(0, lastTsNs), lastTsNs) nextLastTsNs, err := ma.doSubscribeToOneFiler(f, self, peer, lastTsNs) if !ma.isActive(peer) { glog.V(0).Infof("stop subscribing remote %s meta change", peer) diff --git a/weed/filer/reader_cache.go b/weed/filer/reader_cache.go index c319f6c78..4c92f71c8 100644 --- a/weed/filer/reader_cache.go +++ b/weed/filer/reader_cache.go @@ -18,7 +18,7 @@ type ReaderCache struct { } type SingleChunkCacher struct { - sync.RWMutex + sync.Mutex cond *sync.Cond parent *ReaderCache chunkFileId string @@ -183,8 +183,8 @@ func (s *SingleChunkCacher) destroy() { } func (s *SingleChunkCacher) readChunkAt(buf []byte, offset int64) (int, error) { - s.RLock() - defer s.RUnlock() + s.Lock() + defer s.Unlock() for s.completedTime.IsZero() { s.cond.Wait() diff --git a/weed/filer/reader_pattern.go b/weed/filer/reader_pattern.go index b860bc577..ec73c59a2 100644 --- a/weed/filer/reader_pattern.go +++ b/weed/filer/reader_pattern.go @@ -1,8 +1,8 @@ package filer type ReaderPattern struct { - isStreaming bool - lastReadOffset int64 + isSequentialCounter int64 + lastReadStopOffset int64 } // For streaming read: only cache the first chunk @@ -10,29 +10,20 @@ type ReaderPattern struct { func NewReaderPattern() *ReaderPattern { return &ReaderPattern{ - isStreaming: true, - lastReadOffset: -1, + isSequentialCounter: 0, + lastReadStopOffset: 0, } } func (rp *ReaderPattern) MonitorReadAt(offset int64, size int) { - isStreaming := true - if rp.lastReadOffset > offset { - isStreaming = false + if rp.lastReadStopOffset == offset { + rp.isSequentialCounter++ + } else { + rp.isSequentialCounter-- } - if rp.lastReadOffset == -1 { - if offset != 0 { - isStreaming = false - } - } - rp.lastReadOffset = offset - rp.isStreaming = isStreaming -} - -func (rp *ReaderPattern) IsStreamingMode() bool { - return rp.isStreaming + rp.lastReadStopOffset = offset + int64(size) } func (rp *ReaderPattern) IsRandomMode() bool { - return !rp.isStreaming + return rp.isSequentialCounter >= 0 } diff --git a/weed/filer/s3iam_conf.go b/weed/filer/s3iam_conf.go index 891bf925b..d8f3c2445 100644 --- a/weed/filer/s3iam_conf.go +++ b/weed/filer/s3iam_conf.go @@ -2,9 +2,12 @@ package filer import ( "bytes" + "fmt" + "io" + + "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" "github.com/golang/protobuf/jsonpb" "github.com/golang/protobuf/proto" - "io" ) func ParseS3ConfigurationFromBytes[T proto.Message](content []byte, config T) error { @@ -23,3 +26,18 @@ func ProtoToText(writer io.Writer, config proto.Message) error { return m.Marshal(writer, config) } + +// CheckDuplicateAccessKey returns an error message when s3cfg has duplicate access keys +func CheckDuplicateAccessKey(s3cfg *iam_pb.S3ApiConfiguration) error { + accessKeySet := make(map[string]string) + for _, ident := range s3cfg.Identities { + for _, cred := range ident.Credentials { + if userName, found := accessKeySet[cred.AccessKey]; !found { + accessKeySet[cred.AccessKey] = ident.Name + } else { + return fmt.Errorf("duplicate accessKey[%s], already configured in user[%s]", cred.AccessKey, userName) + } + } + } + return nil +} diff --git a/weed/filer/s3iam_conf_test.go b/weed/filer/s3iam_conf_test.go index da7d9c9f1..bd9eb85ae 100644 --- a/weed/filer/s3iam_conf_test.go +++ b/weed/filer/s3iam_conf_test.go @@ -2,9 +2,10 @@ package filer import ( "bytes" - . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" "testing" + . "github.com/chrislusf/seaweedfs/weed/s3api/s3_constants" + "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" "github.com/stretchr/testify/assert" @@ -55,3 +56,93 @@ func TestS3Conf(t *testing.T) { assert.Equal(t, "some_access_key1", s3ConfSaved.Identities[0].Credentials[0].AccessKey) assert.Equal(t, "some_secret_key2", s3ConfSaved.Identities[1].Credentials[0].SecretKey) } + +func TestCheckDuplicateAccessKey(t *testing.T) { + var tests = []struct { + s3cfg *iam_pb.S3ApiConfiguration + err string + }{ + { + &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "some_name", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "some_access_key1", + SecretKey: "some_secret_key1", + }, + }, + Actions: []string{ + ACTION_ADMIN, + ACTION_READ, + ACTION_WRITE, + }, + }, + { + Name: "some_read_only_user", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "some_access_key2", + SecretKey: "some_secret_key2", + }, + }, + Actions: []string{ + ACTION_READ, + ACTION_TAGGING, + ACTION_LIST, + }, + }, + }, + }, + "", + }, + { + &iam_pb.S3ApiConfiguration{ + Identities: []*iam_pb.Identity{ + { + Name: "some_name", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "some_access_key1", + SecretKey: "some_secret_key1", + }, + }, + Actions: []string{ + ACTION_ADMIN, + ACTION_READ, + ACTION_WRITE, + }, + }, + { + Name: "some_read_only_user", + Credentials: []*iam_pb.Credential{ + { + AccessKey: "some_access_key1", + SecretKey: "some_secret_key1", + }, + }, + Actions: []string{ + ACTION_READ, + ACTION_TAGGING, + ACTION_LIST, + }, + }, + }, + }, + "duplicate accessKey[some_access_key1], already configured in user[some_name]", + }, + } + for i, test := range tests { + err := CheckDuplicateAccessKey(test.s3cfg) + var errString string + if err == nil { + errString = "" + } else { + errString = err.Error() + } + if errString != test.err { + t.Errorf("[%d]: got: %s expected: %s", i, errString, test.err) + } + } +} diff --git a/weed/mount/page_writer.go b/weed/mount/page_writer.go index 016c4841a..7e3db8e28 100644 --- a/weed/mount/page_writer.go +++ b/weed/mount/page_writer.go @@ -29,14 +29,14 @@ func newPageWriter(fh *FileHandle, chunkSize int64) *PageWriter { return pw } -func (pw *PageWriter) AddPage(offset int64, data []byte, isSequentail bool) { +func (pw *PageWriter) AddPage(offset int64, data []byte, isSequential bool) { glog.V(4).Infof("%v AddPage [%d, %d)", pw.fh.fh, offset, offset+int64(len(data))) chunkIndex := offset / pw.chunkSize for i := chunkIndex; len(data) > 0; i++ { writeSize := min(int64(len(data)), (i+1)*pw.chunkSize-offset) - pw.addToOneChunk(i, offset, data[:writeSize], isSequentail) + pw.addToOneChunk(i, offset, data[:writeSize], isSequential) offset += writeSize data = data[writeSize:] } diff --git a/weed/mount/page_writer_pattern.go b/weed/mount/page_writer_pattern.go index 665056b36..1ec9c9d4c 100644 --- a/weed/mount/page_writer_pattern.go +++ b/weed/mount/page_writer_pattern.go @@ -1,9 +1,9 @@ package mount type WriterPattern struct { - isStreaming bool - lastWriteOffset int64 - chunkSize int64 + isSequentialCounter int64 + lastWriteStopOffset int64 + chunkSize int64 } // For streaming write: only cache the first chunk @@ -12,33 +12,21 @@ type WriterPattern struct { func NewWriterPattern(chunkSize int64) *WriterPattern { return &WriterPattern{ - isStreaming: true, - lastWriteOffset: -1, - chunkSize: chunkSize, + isSequentialCounter: 0, + lastWriteStopOffset: 0, + chunkSize: chunkSize, } } func (rp *WriterPattern) MonitorWriteAt(offset int64, size int) { - if rp.lastWriteOffset > offset { - rp.isStreaming = false + if rp.lastWriteStopOffset == offset { + rp.isSequentialCounter++ + } else { + rp.isSequentialCounter-- } - if rp.lastWriteOffset == -1 { - if offset != 0 { - rp.isStreaming = false - } - } - rp.lastWriteOffset = offset -} - -func (rp *WriterPattern) IsStreamingMode() bool { - return rp.isStreaming -} - -func (rp *WriterPattern) IsRandomMode() bool { - return !rp.isStreaming + rp.lastWriteStopOffset = offset + int64(size) } -func (rp *WriterPattern) Reset() { - rp.isStreaming = true - rp.lastWriteOffset = -1 +func (rp *WriterPattern) IsSequentialMode() bool { + return rp.isSequentialCounter >= 0 } diff --git a/weed/mount/weedfs_file_write.go b/weed/mount/weedfs_file_write.go index d14680752..2b7a6cea2 100644 --- a/weed/mount/weedfs_file_write.go +++ b/weed/mount/weedfs_file_write.go @@ -58,7 +58,7 @@ func (wfs *WFS) Write(cancel <-chan struct{}, in *fuse.WriteIn, data []byte) (wr entry.Attributes.FileSize = uint64(max(offset+int64(len(data)), int64(entry.Attributes.FileSize))) // glog.V(4).Infof("%v write [%d,%d) %d", fh.f.fullpath(), req.Offset, req.Offset+int64(len(req.Data)), len(req.Data)) - fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsStreamingMode()) + fh.dirtyPages.AddPage(offset, data, fh.dirtyPages.writerPattern.IsSequentialMode()) written = uint32(len(data)) diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go index fb23d9ce9..f9e97ea22 100644 --- a/weed/s3api/auth_credentials.go +++ b/weed/s3api/auth_credentials.go @@ -109,6 +109,11 @@ func (iam *IdentityAccessManagement) LoadS3ApiConfigurationFromBytes(content []b glog.Warningf("unmarshal error: %v", err) return fmt.Errorf("unmarshal error: %v", err) } + + if err := filer.CheckDuplicateAccessKey(s3ApiConfiguration); err != nil { + return err + } + if err := iam.loadS3ApiConfiguration(s3ApiConfiguration); err != nil { return err } diff --git a/weed/shell/command_s3_configure.go b/weed/shell/command_s3_configure.go index ddcafd847..422df2e75 100644 --- a/weed/shell/command_s3_configure.go +++ b/weed/shell/command_s3_configure.go @@ -4,11 +4,12 @@ import ( "bytes" "flag" "fmt" - "github.com/chrislusf/seaweedfs/weed/filer" "io" "sort" "strings" + "github.com/chrislusf/seaweedfs/weed/filer" + "github.com/chrislusf/seaweedfs/weed/pb/filer_pb" "github.com/chrislusf/seaweedfs/weed/pb/iam_pb" ) @@ -164,6 +165,10 @@ func (c *commandS3Configure) Do(args []string, commandEnv *CommandEnv, writer io s3cfg.Identities = append(s3cfg.Identities, &identity) } + if err = filer.CheckDuplicateAccessKey(s3cfg); err != nil { + return err + } + buf.Reset() filer.ProtoToText(&buf, s3cfg) |
