aboutsummaryrefslogtreecommitdiff
path: root/weed/server
diff options
context:
space:
mode:
Diffstat (limited to 'weed/server')
-rw-r--r--weed/server/common.go10
-rw-r--r--weed/server/filer_server.go2
-rw-r--r--weed/server/filer_server_handlers.go12
-rw-r--r--weed/server/filer_server_handlers_tagging.go102
-rw-r--r--weed/server/master_grpc_server.go6
-rw-r--r--weed/server/master_server.go2
6 files changed, 123 insertions, 11 deletions
diff --git a/weed/server/common.go b/weed/server/common.go
index 44098a4b5..62ddf6e7f 100644
--- a/weed/server/common.go
+++ b/weed/server/common.go
@@ -38,10 +38,12 @@ func init() {
func writeJson(w http.ResponseWriter, r *http.Request, httpStatus int, obj interface{}) (err error) {
var bytes []byte
- if r.FormValue("pretty") != "" {
- bytes, err = json.MarshalIndent(obj, "", " ")
- } else {
- bytes, err = json.Marshal(obj)
+ if obj != nil {
+ if r.FormValue("pretty") != "" {
+ bytes, err = json.MarshalIndent(obj, "", " ")
+ } else {
+ bytes, err = json.Marshal(obj)
+ }
}
if err != nil {
return
diff --git a/weed/server/filer_server.go b/weed/server/filer_server.go
index 065bb3251..dc93ae062 100644
--- a/weed/server/filer_server.go
+++ b/weed/server/filer_server.go
@@ -89,7 +89,7 @@ func NewFilerServer(defaultMux, readonlyMux *http.ServeMux, option *FilerOption)
glog.Fatal("master list is required!")
}
- fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, func() {
+ fs.filer = filer.NewFiler(option.Masters, fs.grpcDialOption, option.Host, option.Port, option.Collection, option.DefaultReplication, option.DataCenter, func() {
fs.listenersCond.Broadcast()
})
fs.filer.Cipher = option.Cipher
diff --git a/weed/server/filer_server_handlers.go b/weed/server/filer_server_handlers.go
index 555036feb..a64b23927 100644
--- a/weed/server/filer_server_handlers.go
+++ b/weed/server/filer_server_handlers.go
@@ -26,11 +26,19 @@ func (fs *FilerServer) filerHandler(w http.ResponseWriter, r *http.Request) {
stats.FilerRequestHistogram.WithLabelValues("head").Observe(time.Since(start).Seconds())
case "DELETE":
stats.FilerRequestCounter.WithLabelValues("delete").Inc()
- fs.DeleteHandler(w, r)
+ if _, ok := r.URL.Query()["tagging"]; ok {
+ fs.DeleteTaggingHandler(w,r)
+ } else {
+ fs.DeleteHandler(w, r)
+ }
stats.FilerRequestHistogram.WithLabelValues("delete").Observe(time.Since(start).Seconds())
case "PUT":
stats.FilerRequestCounter.WithLabelValues("put").Inc()
- fs.PostHandler(w, r)
+ if _, ok := r.URL.Query()["tagging"]; ok {
+ fs.PutTaggingHandler(w,r)
+ } else {
+ fs.PostHandler(w, r)
+ }
stats.FilerRequestHistogram.WithLabelValues("put").Observe(time.Since(start).Seconds())
case "POST":
stats.FilerRequestCounter.WithLabelValues("post").Inc()
diff --git a/weed/server/filer_server_handlers_tagging.go b/weed/server/filer_server_handlers_tagging.go
new file mode 100644
index 000000000..50b3a2c06
--- /dev/null
+++ b/weed/server/filer_server_handlers_tagging.go
@@ -0,0 +1,102 @@
+package weed_server
+
+import (
+ "context"
+ "net/http"
+ "strings"
+
+ "github.com/chrislusf/seaweedfs/weed/glog"
+ "github.com/chrislusf/seaweedfs/weed/storage/needle"
+ "github.com/chrislusf/seaweedfs/weed/util"
+)
+
+// add or replace one file Seaweed- prefixed attributes
+// curl -X PUT -H "Seaweed-Name1: value1" http://localhost:8888/path/to/a/file?tagging
+func (fs *FilerServer) PutTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ ctx := context.Background()
+
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ path = path[:len(path)-1]
+ }
+
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ if existingEntry == nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+
+ if existingEntry.Extended == nil {
+ existingEntry.Extended = make(map[string][]byte)
+ }
+
+ for header, values := range r.Header {
+ if strings.HasPrefix(header, needle.PairNamePrefix) {
+ for _, value := range values {
+ existingEntry.Extended[header] = []byte(value)
+ }
+ }
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
+ glog.V(0).Infof("failing to update %s tagging : %v", path, dbErr)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+
+ writeJsonQuiet(w, r, http.StatusAccepted, nil)
+ return
+}
+
+// remove all Seaweed- prefixed attributes
+// curl -X DELETE http://localhost:8888/path/to/a/file?tagging
+func (fs *FilerServer) DeleteTaggingHandler(w http.ResponseWriter, r *http.Request) {
+
+ ctx := context.Background()
+
+ path := r.URL.Path
+ if strings.HasSuffix(path, "/") {
+ path = path[:len(path)-1]
+ }
+
+ existingEntry, err := fs.filer.FindEntry(ctx, util.FullPath(path))
+ if err != nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+ if existingEntry == nil {
+ writeJsonError(w, r, http.StatusNotFound, err)
+ return
+ }
+
+ if existingEntry.Extended == nil {
+ existingEntry.Extended = make(map[string][]byte)
+ }
+
+ hasDeletion := false
+ for header, _ := range existingEntry.Extended {
+ if strings.HasPrefix(header, needle.PairNamePrefix) {
+ delete(existingEntry.Extended, header)
+ hasDeletion = true
+ }
+ }
+
+ if !hasDeletion {
+ writeJsonQuiet(w, r, http.StatusNotModified, nil)
+ return
+ }
+
+ if dbErr := fs.filer.CreateEntry(ctx, existingEntry, false, false, nil); dbErr != nil {
+ glog.V(0).Infof("failing to delete %s tagging : %v", path, dbErr)
+ writeJsonError(w, r, http.StatusInternalServerError, err)
+ return
+ }
+
+ writeJsonQuiet(w, r, http.StatusAccepted, nil)
+ return
+}
diff --git a/weed/server/master_grpc_server.go b/weed/server/master_grpc_server.go
index e8fa3995d..9df88e956 100644
--- a/weed/server/master_grpc_server.go
+++ b/weed/server/master_grpc_server.go
@@ -86,8 +86,9 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
glog.V(4).Infof("master received heartbeat %s", heartbeat.String())
message := &master_pb.VolumeLocation{
- Url: dn.Url(),
- PublicUrl: dn.PublicUrl,
+ Url: dn.Url(),
+ PublicUrl: dn.PublicUrl,
+ DataCenter: string(dn.GetDataCenter().Id()),
}
if len(heartbeat.NewVolumes) > 0 || len(heartbeat.DeletedVolumes) > 0 {
// process delta volume ids if exists for fast volume id updates
@@ -148,7 +149,6 @@ func (ms *MasterServer) SendHeartbeat(stream master_pb.Seaweed_SendHeartbeatServ
}
}
-
if len(message.NewVids) > 0 || len(message.DeletedVids) > 0 {
ms.clientChansLock.RLock()
for host, ch := range ms.clientChans {
diff --git a/weed/server/master_server.go b/weed/server/master_server.go
index cc1c4b2ad..ccc94ebac 100644
--- a/weed/server/master_server.go
+++ b/weed/server/master_server.go
@@ -93,7 +93,7 @@ func NewMasterServer(r *mux.Router, option *MasterOption, peers []string) *Maste
preallocateSize: preallocateSize,
clientChans: make(map[string]chan *master_pb.VolumeLocation),
grpcDialOption: grpcDialOption,
- MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, peers),
+ MasterClient: wdclient.NewMasterClient(grpcDialOption, "master", option.Host, 0, "", peers),
adminLocks: NewAdminLocks(),
}
ms.bounedLeaderChan = make(chan int, 16)