aboutsummaryrefslogtreecommitdiff
path: root/weed/operation
diff options
context:
space:
mode:
Diffstat (limited to 'weed/operation')
-rw-r--r--weed/operation/assign_file_id.go34
-rw-r--r--weed/operation/chunked_file.go5
-rw-r--r--weed/operation/data_struts.go1
-rw-r--r--weed/operation/delete_content.go17
-rw-r--r--weed/operation/grpc_client.go8
-rw-r--r--weed/operation/lookup.go5
-rw-r--r--weed/operation/stats.go5
-rw-r--r--weed/operation/submit.go26
-rw-r--r--weed/operation/sync_volume.go9
-rw-r--r--weed/operation/upload_content.go6
10 files changed, 69 insertions, 47 deletions
diff --git a/weed/operation/assign_file_id.go b/weed/operation/assign_file_id.go
index 00e1caad5..7e7a9059d 100644
--- a/weed/operation/assign_file_id.go
+++ b/weed/operation/assign_file_id.go
@@ -3,9 +3,13 @@ package operation
import (
"context"
"fmt"
+ "google.golang.org/grpc"
+ "strings"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
+ "github.com/chrislusf/seaweedfs/weed/security"
+ "github.com/chrislusf/seaweedfs/weed/util"
)
type VolumeAssignRequest struct {
@@ -19,14 +23,15 @@ type VolumeAssignRequest struct {
}
type AssignResult struct {
- Fid string `json:"fid,omitempty"`
- Url string `json:"url,omitempty"`
- PublicUrl string `json:"publicUrl,omitempty"`
- Count uint64 `json:"count,omitempty"`
- Error string `json:"error,omitempty"`
+ Fid string `json:"fid,omitempty"`
+ Url string `json:"url,omitempty"`
+ PublicUrl string `json:"publicUrl,omitempty"`
+ Count uint64 `json:"count,omitempty"`
+ Error string `json:"error,omitempty"`
+ Auth security.EncodedJwt `json:"auth,omitempty"`
}
-func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
+func Assign(server string, grpcDialOption grpc.DialOption, primaryRequest *VolumeAssignRequest, alternativeRequests ...*VolumeAssignRequest) (*AssignResult, error) {
var requests []*VolumeAssignRequest
requests = append(requests, primaryRequest)
@@ -40,7 +45,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
continue
}
- lastError = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
+ lastError = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
@@ -63,6 +68,7 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
ret.Url = resp.Url
ret.PublicUrl = resp.PublicUrl
ret.Error = resp.Error
+ ret.Auth = security.EncodedJwt(resp.Auth)
return nil
@@ -81,3 +87,17 @@ func Assign(server string, primaryRequest *VolumeAssignRequest, alternativeReque
return ret, lastError
}
+
+func LookupJwt(master string, fileId string) security.EncodedJwt {
+
+ tokenStr := ""
+
+ if h, e := util.Head(fmt.Sprintf("http://%s/dir/lookup?fileId=%s", master, fileId)); e == nil {
+ bearer := h.Get("Authorization")
+ if len(bearer) > 7 && strings.ToUpper(bearer[0:6]) == "BEARER" {
+ tokenStr = bearer[7:]
+ }
+ }
+
+ return security.EncodedJwt(tokenStr)
+}
diff --git a/weed/operation/chunked_file.go b/weed/operation/chunked_file.go
index 9d8267dee..f3f6e7b00 100644
--- a/weed/operation/chunked_file.go
+++ b/weed/operation/chunked_file.go
@@ -4,6 +4,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc"
"io"
"net/http"
"sort"
@@ -69,12 +70,12 @@ func (cm *ChunkManifest) Marshal() ([]byte, error) {
return json.Marshal(cm)
}
-func (cm *ChunkManifest) DeleteChunks(master string) error {
+func (cm *ChunkManifest) DeleteChunks(master string, grpcDialOption grpc.DialOption) error {
var fileIds []string
for _, ci := range cm.Chunks {
fileIds = append(fileIds, ci.Fid)
}
- results, err := DeleteFiles(master, fileIds)
+ results, err := DeleteFiles(master, grpcDialOption, fileIds)
if err != nil {
glog.V(0).Infof("delete %+v: %v", fileIds, err)
return fmt.Errorf("chunk delete: %v", err)
diff --git a/weed/operation/data_struts.go b/weed/operation/data_struts.go
index bfc53aa50..4980f9913 100644
--- a/weed/operation/data_struts.go
+++ b/weed/operation/data_struts.go
@@ -2,6 +2,5 @@ package operation
type JoinResult struct {
VolumeSizeLimit uint64 `json:"VolumeSizeLimit,omitempty"`
- SecretKey string `json:"secretKey,omitempty"`
Error string `json:"error,omitempty"`
}
diff --git a/weed/operation/delete_content.go b/weed/operation/delete_content.go
index 3e468e1a3..1df95211e 100644
--- a/weed/operation/delete_content.go
+++ b/weed/operation/delete_content.go
@@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
+ "google.golang.org/grpc"
"net/http"
"strings"
"sync"
@@ -28,17 +29,17 @@ func ParseFileId(fid string) (vid string, key_cookie string, err error) {
}
// DeleteFiles batch deletes a list of fileIds
-func DeleteFiles(master string, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFiles(master string, grpcDialOption grpc.DialOption, fileIds []string) ([]*volume_server_pb.DeleteResult, error) {
lookupFunc := func(vids []string) (map[string]LookupResult, error) {
- return LookupVolumeIds(master, vids)
+ return LookupVolumeIds(master, grpcDialOption, vids)
}
- return DeleteFilesWithLookupVolumeId(fileIds, lookupFunc)
+ return DeleteFilesWithLookupVolumeId(grpcDialOption, fileIds, lookupFunc)
}
-func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
+func DeleteFilesWithLookupVolumeId(grpcDialOption grpc.DialOption, fileIds []string, lookupFunc func(vid []string) (map[string]LookupResult, error)) ([]*volume_server_pb.DeleteResult, error) {
var ret []*volume_server_pb.DeleteResult
@@ -48,7 +49,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
vid, _, err := ParseFileId(fileId)
if err != nil {
ret = append(ret, &volume_server_pb.DeleteResult{
- FileId: vid,
+ FileId: fileId,
Status: http.StatusBadRequest,
Error: err.Error()},
)
@@ -92,7 +93,7 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
go func(server string, fidList []string) {
defer wg.Done()
- if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, fidList); deleteErr != nil {
+ if deleteResults, deleteErr := DeleteFilesAtOneVolumeServer(server, grpcDialOption, fidList); deleteErr != nil {
err = deleteErr
} else {
ret = append(ret, deleteResults...)
@@ -106,9 +107,9 @@ func DeleteFilesWithLookupVolumeId(fileIds []string, lookupFunc func(vid []strin
}
// DeleteFilesAtOneVolumeServer deletes a list of files that is on one volume server via gRpc
-func DeleteFilesAtOneVolumeServer(volumeServer string, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
+func DeleteFilesAtOneVolumeServer(volumeServer string, grpcDialOption grpc.DialOption, fileIds []string) (ret []*volume_server_pb.DeleteResult, err error) {
- err = WithVolumeServerClient(volumeServer, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
+ err = WithVolumeServerClient(volumeServer, grpcDialOption, func(volumeServerClient volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/grpc_client.go b/weed/operation/grpc_client.go
index d0931a8d3..a02844657 100644
--- a/weed/operation/grpc_client.go
+++ b/weed/operation/grpc_client.go
@@ -18,7 +18,7 @@ var (
grpcClientsLock sync.Mutex
)
-func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.VolumeServerClient) error) error {
+func WithVolumeServerClient(volumeServer string, grpcDialOption grpc.DialOption, fn func(volume_server_pb.VolumeServerClient) error) error {
grpcAddress, err := toVolumeServerGrpcAddress(volumeServer)
if err != nil {
@@ -28,7 +28,7 @@ func WithVolumeServerClient(volumeServer string, fn func(volume_server_pb.Volume
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := volume_server_pb.NewVolumeServerClient(grpcConnection)
return fn(client)
- }, grpcAddress)
+ }, grpcAddress, grpcDialOption)
}
@@ -42,7 +42,7 @@ func toVolumeServerGrpcAddress(volumeServer string) (grpcAddress string, err err
return fmt.Sprintf("%s:%d", volumeServer[0:sepIndex], port+10000), nil
}
-func withMasterServerClient(masterServer string, fn func(masterClient master_pb.SeaweedClient) error) error {
+func withMasterServerClient(masterServer string, grpcDialOption grpc.DialOption, fn func(masterClient master_pb.SeaweedClient) error) error {
masterGrpcAddress, parseErr := util.ParseServerToGrpcAddress(masterServer, 0)
if parseErr != nil {
@@ -52,6 +52,6 @@ func withMasterServerClient(masterServer string, fn func(masterClient master_pb.
return util.WithCachedGrpcClient(func(grpcConnection *grpc.ClientConn) error {
client := master_pb.NewSeaweedClient(grpcConnection)
return fn(client)
- }, masterGrpcAddress)
+ }, masterGrpcAddress, grpcDialOption)
}
diff --git a/weed/operation/lookup.go b/weed/operation/lookup.go
index 562a11580..c4040f3e7 100644
--- a/weed/operation/lookup.go
+++ b/weed/operation/lookup.go
@@ -5,6 +5,7 @@ import (
"encoding/json"
"errors"
"fmt"
+ "google.golang.org/grpc"
"math/rand"
"net/url"
"strings"
@@ -78,7 +79,7 @@ func LookupFileId(server string, fileId string) (fullUrl string, err error) {
}
// LookupVolumeIds find volume locations by cache and actual lookup
-func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, error) {
+func LookupVolumeIds(server string, grpcDialOption grpc.DialOption, vids []string) (map[string]LookupResult, error) {
ret := make(map[string]LookupResult)
var unknown_vids []string
@@ -98,7 +99,7 @@ func LookupVolumeIds(server string, vids []string) (map[string]LookupResult, err
//only query unknown_vids
- err := withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
+ err := withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/stats.go b/weed/operation/stats.go
index 364727272..9f7166864 100644
--- a/weed/operation/stats.go
+++ b/weed/operation/stats.go
@@ -2,14 +2,15 @@ package operation
import (
"context"
+ "google.golang.org/grpc"
"time"
"github.com/chrislusf/seaweedfs/weed/pb/master_pb"
)
-func Statistics(server string, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
+func Statistics(server string, grpcDialOption grpc.DialOption, req *master_pb.StatisticsRequest) (resp *master_pb.StatisticsResponse, err error) {
- err = withMasterServerClient(server, func(masterClient master_pb.SeaweedClient) error {
+ err = withMasterServerClient(server, grpcDialOption, func(masterClient master_pb.SeaweedClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
diff --git a/weed/operation/submit.go b/weed/operation/submit.go
index 7a1a3085e..bdf59d966 100644
--- a/weed/operation/submit.go
+++ b/weed/operation/submit.go
@@ -2,6 +2,7 @@ package operation
import (
"bytes"
+ "google.golang.org/grpc"
"io"
"mime"
"net/url"
@@ -36,10 +37,8 @@ type SubmitResult struct {
Error string `json:"error,omitempty"`
}
-func SubmitFiles(master string, files []FilePart,
- replication string, collection string, dataCenter string, ttl string, maxMB int,
- secret security.Secret,
-) ([]SubmitResult, error) {
+func SubmitFiles(master string, grpcDialOption grpc.DialOption, files []FilePart,
+ replication string, collection string, dataCenter string, ttl string, maxMB int) ([]SubmitResult, error) {
results := make([]SubmitResult, len(files))
for index, file := range files {
results[index].FileName = file.FileName
@@ -51,7 +50,7 @@ func SubmitFiles(master string, files []FilePart,
DataCenter: dataCenter,
Ttl: ttl,
}
- ret, err := Assign(master, ar)
+ ret, err := Assign(master, grpcDialOption, ar)
if err != nil {
for index, _ := range files {
results[index].Error = err.Error()
@@ -67,7 +66,7 @@ func SubmitFiles(master string, files []FilePart,
file.Replication = replication
file.Collection = collection
file.DataCenter = dataCenter
- results[index].Size, err = file.Upload(maxMB, master, secret)
+ results[index].Size, err = file.Upload(maxMB, master, ret.Auth, grpcDialOption)
if err != nil {
results[index].Error = err.Error()
}
@@ -110,8 +109,7 @@ func newFilePart(fullPathFilename string) (ret FilePart, err error) {
return ret, nil
}
-func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (retSize uint32, err error) {
- jwt := security.GenJwt(secret, fi.Fid)
+func (fi FilePart) Upload(maxMB int, master string, jwt security.EncodedJwt, grpcDialOption grpc.DialOption) (retSize uint32, err error) {
fileUrl := "http://" + fi.Server + "/" + fi.Fid
if fi.ModTime != 0 {
fileUrl += "?ts=" + strconv.Itoa(int(fi.ModTime))
@@ -139,7 +137,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
return
}
@@ -152,10 +150,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
Collection: fi.Collection,
Ttl: fi.Ttl,
}
- ret, err = Assign(master, ar)
+ ret, err = Assign(master, grpcDialOption, ar)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return
}
id = ret.Fid
@@ -170,10 +168,10 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
baseName+"-"+strconv.FormatInt(i+1, 10),
io.LimitReader(fi.Reader, chunkSize),
master, fileUrl,
- jwt)
+ ret.Auth)
if e != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
return 0, e
}
cm.Chunks = append(cm.Chunks,
@@ -188,7 +186,7 @@ func (fi FilePart) Upload(maxMB int, master string, secret security.Secret) (ret
err = upload_chunked_file_manifest(fileUrl, &cm, jwt)
if err != nil {
// delete all uploaded chunks
- cm.DeleteChunks(master)
+ cm.DeleteChunks(master, grpcDialOption)
}
} else {
ret, e := Upload(fileUrl, baseName, fi.Reader, false, fi.MimeType, nil, jwt)
diff --git a/weed/operation/sync_volume.go b/weed/operation/sync_volume.go
index e40c7de41..bf81415c9 100644
--- a/weed/operation/sync_volume.go
+++ b/weed/operation/sync_volume.go
@@ -3,6 +3,7 @@ package operation
import (
"context"
"fmt"
+ "google.golang.org/grpc"
"io"
"time"
@@ -11,9 +12,9 @@ import (
"github.com/chrislusf/seaweedfs/weed/util"
)
-func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
+func GetVolumeSyncStatus(server string, grpcDialOption grpc.DialOption, vid uint32) (resp *volume_server_pb.VolumeSyncStatusResponse, err error) {
- WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
+ WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
ctx, cancel := context.WithTimeout(context.Background(), time.Duration(5*time.Second))
defer cancel()
@@ -26,9 +27,9 @@ func GetVolumeSyncStatus(server string, vid uint32) (resp *volume_server_pb.Volu
return
}
-func GetVolumeIdxEntries(server string, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
+func GetVolumeIdxEntries(server string, grpcDialOption grpc.DialOption, vid uint32, eachEntryFn func(key NeedleId, offset Offset, size uint32)) error {
- return WithVolumeServerClient(server, func(client volume_server_pb.VolumeServerClient) error {
+ return WithVolumeServerClient(server, grpcDialOption, func(client volume_server_pb.VolumeServerClient) error {
stream, err := client.VolumeSyncIndex(context.Background(), &volume_server_pb.VolumeSyncIndexRequest{
VolumdId: vid,
})
diff --git a/weed/operation/upload_content.go b/weed/operation/upload_content.go
index 030bf5889..be7b8e69c 100644
--- a/weed/operation/upload_content.go
+++ b/weed/operation/upload_content.go
@@ -58,9 +58,6 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
if isGzipped {
h.Set("Content-Encoding", "gzip")
}
- if jwt != "" {
- h.Set("Authorization", "BEARER "+string(jwt))
- }
file_writer, cp_err := body_writer.CreatePart(h)
if cp_err != nil {
@@ -86,6 +83,9 @@ func upload_content(uploadUrl string, fillBufferFunction func(w io.Writer) error
for k, v := range pairMap {
req.Header.Set(k, v)
}
+ if jwt != "" {
+ req.Header.Set("Authorization", "BEARER "+string(jwt))
+ }
resp, post_err := client.Do(req)
if post_err != nil {
glog.V(0).Infoln("failing to upload to", uploadUrl, post_err.Error())