aboutsummaryrefslogtreecommitdiff
path: root/weed
diff options
context:
space:
mode:
authorChris Lu <chris.lu@uber.com>2021-04-10 23:47:47 -0700
committerChris Lu <chris.lu@uber.com>2021-04-10 23:47:47 -0700
commitaf313dff58bf82a731dbce72535b72f1979d6740 (patch)
tree3799d0cc618c248c6c3bca4bdcc002e62ea92727 /weed
parent98c08a3dcd3d3a3115828dc04b5f11e56cd67489 (diff)
downloadseaweedfs-af313dff58bf82a731dbce72535b72f1979d6740.tar.xz
seaweedfs-af313dff58bf82a731dbce72535b72f1979d6740.zip
add gateway for easier POST and DELETE blobs
Diffstat (limited to 'weed')
-rw-r--r--weed/command/command.go1
-rw-r--r--weed/command/gateway.go93
-rw-r--r--weed/server/gateway_server.go106
-rw-r--r--weed/util/http_util.go22
4 files changed, 222 insertions, 0 deletions
diff --git a/weed/command/command.go b/weed/command/command.go
index ce754702f..b6efcead2 100644
--- a/weed/command/command.go
+++ b/weed/command/command.go
@@ -22,6 +22,7 @@ var Commands = []*Command{
cmdFilerReplicate,
cmdFilerSynchronize,
cmdFix,
+ cmdGateway,
cmdMaster,
cmdMount,
cmdS3,
diff --git a/weed/command/gateway.go b/weed/command/gateway.go
new file mode 100644
index 000000000..a2a97889f
--- /dev/null
+++ b/weed/command/gateway.go
@@ -0,0 +1,93 @@
+package command
+
+import (
+ "net/http"
+ "strconv"
+ "strings"
+ "time"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/server"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+var (
+ gatewayOptions GatewayOptions
+)
+
+type GatewayOptions struct {
+ masters *string
+ filers *string
+ bindIp *string
+ port *int
+ maxMB *int
+}
+
+func init() {
+ cmdGateway.Run = runGateway // break init cycle
+ gatewayOptions.masters = cmdGateway.Flag.String("master", "localhost:9333", "comma-separated master servers")
+ gatewayOptions.filers = cmdGateway.Flag.String("filer", "localhost:8888", "comma-separated filer servers")
+ gatewayOptions.bindIp = cmdGateway.Flag.String("ip.bind", "localhost", "ip address to bind to")
+ gatewayOptions.port = cmdGateway.Flag.Int("port", 5647, "gateway http listen port")
+ gatewayOptions.maxMB = cmdGateway.Flag.Int("maxMB", 4, "split files larger than the limit")
+}
+
+var cmdGateway = &Command{
+ UsageLine: "gateway -port=8888 -master=<ip:port>[,<ip:port>]* -filer=<ip:port>[,<ip:port>]*",
+ Short: "start a gateway server that points to a list of master servers or a list of filers",
+ Long: `start a gateway server which accepts REST operation to write any blobs, files, or topic messages.
+
+ POST /blobs/
+ return a chunk id
+ DELETE /blobs/<chunk_id>
+ delete a chunk id
+
+ /*
+ POST /files/path/to/a/file
+ save /path/to/a/file on filer
+ DELETE /files/path/to/a/file
+ delete /path/to/a/file on filer
+
+ POST /topics/topicName
+ save on filer to /topics/topicName/<ds>/ts.json
+ */
+`,
+}
+
+func runGateway(cmd *Command, args []string) bool {
+
+ util.LoadConfiguration("security", false)
+
+ gatewayOptions.startGateway()
+
+ return true
+}
+
+func (gw *GatewayOptions) startGateway() {
+
+ defaultMux := http.NewServeMux()
+
+ _, gws_err := weed_server.NewGatewayServer(defaultMux, &weed_server.GatewayOption{
+ Masters: strings.Split(*gw.masters, ","),
+ Filers: strings.Split(*gw.filers, ","),
+ MaxMB: *gw.maxMB,
+ })
+ if gws_err != nil {
+ glog.Fatalf("Gateway startup error: %v", gws_err)
+ }
+
+ glog.V(0).Infof("Start Seaweed Gateway %s at %s:%d", util.Version(), *gw.bindIp, *gw.port)
+ gatewayListener, e := util.NewListener(
+ *gw.bindIp+":"+strconv.Itoa(*gw.port),
+ time.Duration(10)*time.Second,
+ )
+ if e != nil {
+ glog.Fatalf("Filer listener error: %v", e)
+ }
+
+ httpS := &http.Server{Handler: defaultMux}
+ if err := httpS.Serve(gatewayListener); err != nil {
+ glog.Fatalf("Gateway Fail to serve: %v", e)
+ }
+
+}
diff --git a/weed/server/gateway_server.go b/weed/server/gateway_server.go
new file mode 100644
index 000000000..608217ed7
--- /dev/null
+++ b/weed/server/gateway_server.go
@@ -0,0 +1,106 @@
+package weed_server
+
+import (
+ "github.com/chrislusf/seaweedfs/weed/operation"
+ "google.golang.org/grpc"
+ "math/rand"
+ "net/http"
+
+ "github.com/chrislusf/seaweedfs/weed/util"
+
+ _ "github.com/chrislusf/seaweedfs/weed/filer/cassandra"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/elastic/v7"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/etcd"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/hbase"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/leveldb3"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mongodb"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/mysql2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/postgres2"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis"
+ _ "github.com/chrislusf/seaweedfs/weed/filer/redis2"
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/aws_sqs"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/gocdk_pub_sub"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/google_pub_sub"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/kafka"
+ _ "github.com/chrislusf/seaweedfs/weed/notification/log"
+ "github.com/chrislusf/seaweedfs/weed/security"
+)
+
+type GatewayOption struct {
+ Masters []string
+ Filers []string
+ MaxMB int
+ IsSecure bool
+}
+
+type GatewayServer struct {
+ option *GatewayOption
+ secret security.SigningKey
+ grpcDialOption grpc.DialOption
+}
+
+func NewGatewayServer(defaultMux *http.ServeMux, option *GatewayOption) (fs *GatewayServer, err error) {
+
+ fs = &GatewayServer{
+ option: option,
+ grpcDialOption: security.LoadClientTLS(util.GetViper(), "grpc.client"),
+ }
+
+ if len(option.Masters) == 0 {
+ glog.Fatal("master list is required!")
+ }
+
+ defaultMux.HandleFunc("/blobs/", fs.blobsHandler)
+ defaultMux.HandleFunc("/files/", fs.filesHandler)
+ defaultMux.HandleFunc("/topics/", fs.topicsHandler)
+
+ return fs, nil
+}
+
+func (fs *GatewayServer) getMaster() string {
+ randMaster := rand.Intn(len(fs.option.Masters))
+ return fs.option.Masters[randMaster]
+}
+
+func (fs *GatewayServer) blobsHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "DELETE":
+ chunkId := r.URL.Path[len("/blobs/"):]
+ fullUrl, err := operation.LookupFileId(fs.getMaster, chunkId)
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ var jwtAuthorization security.EncodedJwt
+ if fs.option.IsSecure {
+ jwtAuthorization = operation.LookupJwt(fs.getMaster(), chunkId)
+ }
+ body, statusCode, err := util.DeleteProxied(fullUrl, string(jwtAuthorization))
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ w.WriteHeader(statusCode)
+ w.Write(body)
+ case "POST":
+ submitForClientHandler(w, r, fs.getMaster, fs.grpcDialOption)
+ }
+}
+
+func (fs *GatewayServer) filesHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "DELETE":
+ case "POST":
+ }
+}
+
+func (fs *GatewayServer) topicsHandler(w http.ResponseWriter, r *http.Request) {
+ switch r.Method {
+ case "POST":
+ }
+}
diff --git a/weed/util/http_util.go b/weed/util/http_util.go
index 135d10c45..1c1b2b377 100644
--- a/weed/util/http_util.go
+++ b/weed/util/http_util.go
@@ -124,6 +124,28 @@ func Delete(url string, jwt string) error {
return errors.New(string(body))
}
+func DeleteProxied(url string, jwt string) (body []byte, httpStatus int, err error) {
+ req, err := http.NewRequest("DELETE", url, nil)
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
+ if err != nil {
+ return
+ }
+ resp, err := client.Do(req)
+ if err != nil {
+ return
+ }
+ defer resp.Body.Close()
+ body, err = ioutil.ReadAll(resp.Body)
+ if err != nil {
+ return
+ }
+ httpStatus = resp.StatusCode
+ return
+}
+
+
func GetBufferStream(url string, values url.Values, allocatedBytes []byte, eachBuffer func([]byte)) error {
r, err := client.PostForm(url, values)
if err != nil {