aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
Diffstat (limited to 'weed')
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/gateway.go93
-rw-r--r--weed/filesys/dir.go1
-rw-r--r--weed/filesys/filehandle.go2
-rw-r--r--weed/operation/upload_content.go2
-rw-r--r--weed/server/filer_server_handlers_write_upload.go170
-rw-r--r--weed/server/gateway_server.go106
-rw-r--r--weed/util/constants.go2
-rw-r--r--weed/util/http_util.go21
9 files changed, 275 insertions, 123 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index ce754702f..b6efcead2 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -22,6 +22,7 @@ var Commands = []*Command{
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
+ cmdGateway,
cmdMaster,
cmdMount,
cmdS3,
diff --git a/weed/command/gateway.go b/weed/command/gateway.go
new file mode 100644
index 000000000..8a6f852a5
--- /dev/null
+++ b/weed/command/gateway.go
@@ -0,0 +1,93 @@
+package command
+
+import (
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ gatewayOptions GatewayOptions
+)
+
+type GatewayOptions struct {
+ masters *string
+ filers *string
+ bindIp *string
+ port *int
+ maxMB *int
+}
+
+func init() {
+ cmdGateway.Run = runGateway // break init cycle
+ gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers")
+ gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to")
+ gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port")
+ gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit")
+}
+
+var cmdGateway = &Command{
+ UsageLine: "gateway -port=8888 -master=<ip:port>[,<ip:port>]* -filer=<ip:port>[,<ip:port>]*",
+ Short: "start a gateway server that points to a list of master servers or a list of filers",
+ Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages.
+
+ POST /blobs/
+ upload the blob and return a chunk id
+ DELETE /blobs/<chunk_id>
+ delete a chunk id
+
+ /*
+ POST /files/path/to/a/file
+ save /path/to/a/file on filer
+ DELETE /files/path/to/a/file
+ delete /path/to/a/file on filer
+
+ POST /topics/topicName
+ save on filer to /topics/topicName/<ds>/ts.json
+ */
+`,
+}
+
+func runGateway(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ gatewayOptions.startGateway()
+
+ return true
+}
+
+func (gw *GatewayOptions) startGateway() {
+
+ defaultMux := http.NewServeMux()
+
+ _, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{
+ Masters: strings.Split(*gw.masters, ","),
+ Filers: strings.Split(*gw.filers, ","),
+ MaxMB: *gw.maxMB,
+ })
+ if gws_err != nil {
+ glog.Fatalf("Gateway startup error: %v", gws_err)
+ }
+
+ glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port)
+ gatewayListener, e := util.NewListener(
+ *gw.bindIp+":"+strconv.Itoa(*gw.port),
+ time.Duration(10)*time.Second,
+ )
+ if e != nil {
+ glog.Fatalf("Filer listener error: %v", e)
+ }
+
+ httpS := &http.Server{Handler: defaultMux}
+ if err := httpS.Serve(gatewayListener); err != nil {
+ glog.Fatalf("Gateway Fail to serve: %v", e)
+ }
+
+}
diff --git a/weed/filesys/dir.go b/weed/filesys/dir.go
index 46457f858..7b918e769 100644
--- a/weed/filesys/dir.go
+++ b/weed/filesys/dir.go
@@ -372,7 +372,6 @@ func (dir *Dir) Remove(ctx context.Context, req *fuse.RemoveRequest) error {
return fuse.EPERM
}
-
if !req.Dir {
return dir.removeOneFile(req)
}
diff --git a/weed/filesys/filehandle.go b/weed/filesys/filehandle.go
index 4419888c4..f04952e96 100644
--- a/weed/filesys/filehandle.go
+++ b/weed/filesys/filehandle.go
@@ -200,6 +200,8 @@ func (fh *FileHandle) Release(ctx context.Context, req *fuse.ReleaseRequest) err
fh.Lock()
defer fh.Unlock()
+ fh.f.entryViewCache = nil
+
if fh.f.isOpen <= 0 {
glog.V(0).Infof("Release reset %s open count %d => %d", fh.f.Name, fh.f.isOpen, 0)
fh.f.isOpen = 0
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 40e96fd8c..7a7f8aa0c 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -235,7 +235,7 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
// print("+")
resp, post_err := HttpClient.Do(req)
if post_err != nil {
- if !strings.Contains(post_err.Error(), "connection reset by peer"){
+ if !strings.Contains(post_err.Error(), "connection reset by peer") {
glog.Errorf("upload %s %d bytes to %v: %v", filename, originalDataSize, uploadUrl, post_err)
debug.PrintStack()
}
diff --git a/weed/server/filer_server_handlers_write_upload.go b/weed/server/filer_server_handlers_write_upload.go
index 81b2ce1b0..3ab45453e 100644
--- a/weed/server/filer_server_handlers_write_upload.go
+++ b/weed/server/filer_server_handlers_write_upload.go
@@ -6,9 +6,7 @@ import (
"io"
"io/ioutil"
"net/http"
- "runtime"
"strings"
- "sync"
"time"
"github.com/chrislusf/seaweedfs/weed/filer"
@@ -20,143 +18,75 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-var (
- limitedUploadProcessor = util.NewLimitedOutOfOrderProcessor(int32(runtime.NumCPU()))
-)
-
-func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) (fileChunks []*filer_pb.FileChunk, md5Hash hash.Hash, dataSize int64, err error, smallContent []byte) {
+func (fs *FilerServer) uploadReaderToChunks(w http.ResponseWriter, r *http.Request, reader io.Reader, chunkSize int32, fileName, contentType string, contentLength int64, so *operation.StorageOption) ([]*filer_pb.FileChunk, hash.Hash, int64, error, []byte) {
+ var fileChunks []*filer_pb.FileChunk
- md5Hash = md5.New()
+ md5Hash := md5.New()
var partReader = ioutil.NopCloser(io.TeeReader(reader, md5Hash))
- // save small content directly
- if !isAppend(r) && ((0 < contentLength && contentLength < fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && contentLength < 4*1024) {
- smallContent, err = ioutil.ReadAll(partReader)
- dataSize = int64(len(smallContent))
- return
- }
+ chunkOffset := int64(0)
+ var smallContent []byte
- resultsChan := make(chan *ChunkCreationResult, 2)
+ for {
+ limitedReader := io.LimitReader(partReader, int64(chunkSize))
- var waitForAllData sync.WaitGroup
- waitForAllData.Add(1)
- go func() {
- // process upload results
- defer waitForAllData.Done()
- for result := range resultsChan {
- if result.err != nil {
- err = result.err
- continue
- }
-
- // Save to chunk manifest structure
- fileChunks = append(fileChunks, result.chunk)
+ data, err := ioutil.ReadAll(limitedReader)
+ if err != nil {
+ return nil, nil, 0, err, nil
}
- }()
-
- var lock sync.Mutex
- readOffset := int64(0)
- var wg sync.WaitGroup
-
- for err == nil {
-
- wg.Add(1)
- request := func() {
- defer wg.Done()
-
- var localOffset int64
- // read from the input
- lock.Lock()
- localOffset = readOffset
- limitedReader := io.LimitReader(partReader, int64(chunkSize))
- data, readErr := ioutil.ReadAll(limitedReader)
- readOffset += int64(len(data))
- lock.Unlock()
- // handle read errors
- if readErr != nil {
- if err == nil {
- err = readErr
- }
- if readErr != io.EOF {
- resultsChan <- &ChunkCreationResult{
- err: readErr,
- }
- }
- return
+ if chunkOffset == 0 && !isAppend(r) {
+ if len(data) < int(fs.option.SaveToFilerLimit) || strings.HasPrefix(r.URL.Path, filer.DirectoryEtcRoot) && len(data) < 4*1024 {
+ smallContent = data
+ chunkOffset += int64(len(data))
+ break
}
- if len(data) == 0 {
- readErr = io.EOF
- if err == nil {
- err = readErr
- }
- return
+ }
+ dataReader := util.NewBytesReader(data)
+
+ // retry to assign a different file id
+ var fileId, urlLocation string
+ var auth security.EncodedJwt
+ var assignErr, uploadErr error
+ var uploadResult *operation.UploadResult
+ for i := 0; i < 3; i++ {
+ // assign one file id for one chunk
+ fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
+ if assignErr != nil {
+ return nil, nil, 0, assignErr, nil
}
- // upload
- dataReader := util.NewBytesReader(data)
- fileId, uploadResult, uploadErr := fs.doCreateChunk(w, r, so, dataReader, fileName, contentType)
+ // upload the chunk to the volume server
+ uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
if uploadErr != nil {
- if err == nil {
- err = uploadErr
- }
- resultsChan <- &ChunkCreationResult{
- err: uploadErr,
- }
- return
- }
-
- glog.V(4).Infof("uploaded %s to %s [%d,%d)", fileName, fileId, localOffset, localOffset+int64(uploadResult.Size))
-
- // send back uploaded file chunk
- resultsChan <- &ChunkCreationResult{
- chunk: uploadResult.ToPbFileChunk(fileId, localOffset),
+ time.Sleep(251 * time.Millisecond)
+ continue
}
-
+ break
+ }
+ if uploadErr != nil {
+ return nil, nil, 0, uploadErr, nil
}
- limitedUploadProcessor.Execute(request)
- }
-
- go func() {
- wg.Wait()
- close(resultsChan)
- }()
-
- waitForAllData.Wait()
- if err == io.EOF {
- err = nil
- }
+ // if last chunk exhausted the reader exactly at the border
+ if uploadResult.Size == 0 {
+ break
+ }
- return fileChunks, md5Hash, readOffset, err, nil
-}
+ // Save to chunk manifest structure
+ fileChunks = append(fileChunks, uploadResult.ToPbFileChunk(fileId, chunkOffset))
-type ChunkCreationResult struct {
- chunk *filer_pb.FileChunk
- err error
-}
+ glog.V(4).Infof("uploaded %s chunk %d to %s [%d,%d)", fileName, len(fileChunks), fileId, chunkOffset, chunkOffset+int64(uploadResult.Size))
-func (fs *FilerServer) doCreateChunk(w http.ResponseWriter, r *http.Request, so *operation.StorageOption, dataReader *util.BytesReader, fileName string, contentType string) (string, *operation.UploadResult, error) {
- // retry to assign a different file id
- var fileId, urlLocation string
- var auth security.EncodedJwt
- var assignErr, uploadErr error
- var uploadResult *operation.UploadResult
- for i := 0; i < 3; i++ {
- // assign one file id for one chunk
- fileId, urlLocation, auth, assignErr = fs.assignNewFileInfo(so)
- if assignErr != nil {
- return "", nil, assignErr
- }
+ // reset variables for the next chunk
+ chunkOffset = chunkOffset + int64(uploadResult.Size)
- // upload the chunk to the volume server
- uploadResult, uploadErr, _ = fs.doUpload(urlLocation, w, r, dataReader, fileName, contentType, nil, auth)
- if uploadErr != nil {
- time.Sleep(251 * time.Millisecond)
- continue
+ // if last chunk was not at full chunk size, but already exhausted the reader
+ if int64(uploadResult.Size) < int64(chunkSize) {
+ break
}
- break
}
- return fileId, uploadResult, uploadErr
+
+ return fileChunks, md5Hash, chunkOffset, nil, smallContent
}
func (fs *FilerServer) doUpload(urlLocation string, w http.ResponseWriter, r *http.Request, limitedReader io.Reader, fileName string, contentType string, pairMap map[string]string, auth security.EncodedJwt) (*operation.UploadResult, error, []byte) {
diff --git a/weed/server/gateway_server.go b/weed/server/gateway_server.go
new file mode 100644
index 000000000..608217ed7
--- /dev/null
+++ b/weed/server/gateway_server.go
@@ -0,0 +1,106 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "google.golang.org/grpc"
+ "math/rand"
+ "net/http"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/log"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+type GatewayOption struct {
+ Masters []string
+ Filers []string
+ MaxMB int
+ IsSecure bool
+}
+
+type GatewayServer struct {
+ option *GatewayOption
+ secret security.SigningKey
+ grpcDialOption grpc.DialOption
+}
+
+func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) {
+
+ fs = &GatewayServer{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
+ }
+
+ if len(option.Masters) == 0 {
+ glog.Fatal("master list is required!")
+ }
+
+ defaultMux.HandleFunc("/blobs/", fs.blobsHandler)
+ defaultMux.HandleFunc("/files/", fs.filesHandler)
+ defaultMux.HandleFunc("/topics/", fs.topicsHandler)
+
+ return fs, nil
+}
+
+func (fs *GatewayServer) getMaster() string {
+ randMaster := rand.Intn(len(fs.option.Masters))
+ return fs.option.Masters[randMaster]
+}
+
+func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "DELETE":
+ chunkId := r.URL.Path[len("/blobs/"):]
+ fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId)
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ var jwtAuthorization security.EncodedJwt
+ if fs.option.IsSecure {
+ jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId)
+ }
+ body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization))
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ w.WriteHeader(statusCode)
+ w.Write(body)
+ case "POST":
+ submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption)
+ }
+}
+
+func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "DELETE":
+ case "POST":
+ }
+}
+
+func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "POST":
+ }
+}
diff --git a/weed/util/constants.go b/weed/util/constants.go
index 40f4deae2..fce35379d 100644
--- a/weed/util/constants.go
+++ b/weed/util/constants.go
@@ -5,7 +5,7 @@ import (
)
var (
- VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 38)
+ VERSION = fmt.Sprintf("%s %d.%02d", sizeLimit, 2, 39)
COMMIT = ""
)
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 135d10c45..1630760b1 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -124,6 +124,27 @@ func Delete(url string, jwt string) error {
return errors.New(string(body))
}
+func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
+ req, err := http.NewRequest("DELETE", url, nil)
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
+ if err != nil {
+ return
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return
+ }
+ defer resp.Body.Close()
+ body, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return
+ }
+ httpStatus = resp.StatusCode
+ return
+}
+
func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
r, err := client.PostForm(url, values)
if err != nil {