aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--go.mod5
-rw-r--r--go.sum25
-rw-r--r--test/s3/multipart/aws_upload.go1
-rw-r--r--unmaintained/change_superblock/change_superblock.go20
-rw-r--r--unmaintained/diff_volume_servers/diff_volume_servers.go16
-rw-r--r--unmaintained/fix_dat/fix_dat.go10
-rw-r--r--unmaintained/s3/presigned_put/presigned_put.go11
-rw-r--r--unmaintained/stream_read_volume/stream_read_volume.go2
-rw-r--r--unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go2
-rw-r--r--unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go2
-rw-r--r--unmaintained/volume_tailer/volume_tailer.go4
-rw-r--r--weed/filer/redis2/redis_store.go12
-rw-r--r--weed/mount/filehandle.go8
-rw-r--r--weed/mq/broker/broker_grpc_pub.go1
-rw-r--r--weed/query/engine/arithmetic_functions.go12
-rw-r--r--weed/query/engine/arithmetic_functions_test.go314
-rw-r--r--weed/query/engine/datetime_functions.go24
-rw-r--r--weed/remote_storage/azure/azure_highlevel.go120
-rw-r--r--weed/remote_storage/azure/azure_storage_client.go287
-rw-r--r--weed/remote_storage/azure/azure_storage_client_test.go377
-rw-r--r--weed/replication/sink/azuresink/azure_sink.go92
-rw-r--r--weed/replication/sink/azuresink/azure_sink_test.go355
-rw-r--r--weed/s3api/auth_credentials.go8
-rw-r--r--weed/s3api/filer_multipart.go2
-rw-r--r--weed/s3api/policy_engine/types.go2
-rw-r--r--weed/s3api/s3_list_parts_action_test.go52
-rw-r--r--weed/s3api/s3_sse_multipart_test.go2
-rw-r--r--weed/s3api/s3api_object_handlers_put.go2
-rw-r--r--weed/server/filer_server_handlers_write.go2
-rw-r--r--weed/server/filer_server_handlers_write_autochunk.go2
-rw-r--r--weed/server/master_grpc_server_volume.go2
-rw-r--r--weed/shell/command_volume_check_disk.go10
32 files changed, 1242 insertions, 542 deletions
diff --git a/go.mod b/go.mod
index b0dd04ae9..ff1a92f7e 100644
--- a/go.mod
+++ b/go.mod
@@ -8,8 +8,6 @@ require (
cloud.google.com/go v0.121.6 // indirect
cloud.google.com/go/pubsub v1.50.1
cloud.google.com/go/storage v1.56.2
- github.com/Azure/azure-pipeline-go v0.2.3
- github.com/Azure/azure-storage-blob-go v0.15.0
github.com/Shopify/sarama v1.38.1
github.com/aws/aws-sdk-go v1.55.8
github.com/beorn7/perks v1.0.1 // indirect
@@ -57,7 +55,6 @@ require (
github.com/kurin/blazer v0.5.3
github.com/linxGnu/grocksdb v1.10.2
github.com/mailru/easyjson v0.7.7 // indirect
- github.com/mattn/go-ieproxy v0.0.11 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
@@ -232,7 +229,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.12.0
github.com/Azure/azure-sdk-for-go/sdk/internal v1.11.2 // indirect
- github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 // indirect
+ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2 // indirect
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 // indirect
github.com/AzureAD/microsoft-authentication-library-for-go v1.5.0 // indirect
diff --git a/go.sum b/go.sum
index ed5920fa8..23c4743d8 100644
--- a/go.sum
+++ b/go.sum
@@ -541,8 +541,6 @@ filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA=
filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4=
gioui.org v0.0.0-20210308172011-57750fc8a0a6/go.mod h1:RSH6KIUZ0p2xy5zHDxgAM4zumjgTw83q2ge/PI+yyw8=
git.sr.ht/~sbinet/gg v0.3.1/go.mod h1:KGYtlADtqsqANL9ueOFkWymvzUvLMQllU5Ixo+8v3pc=
-github.com/Azure/azure-pipeline-go v0.2.3 h1:7U9HBg1JFK3jHl5qmo4CTZKFTVgMwdFHMVtCdfBE21U=
-github.com/Azure/azure-pipeline-go v0.2.3/go.mod h1:x841ezTBIMG6O3lAcl8ATHnsOPVl2bqk7S3ta6S6u4k=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1 h1:5YTBM8QDVIBN3sxBil89WfdAAqDZbyJTgh688DSxX5w=
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.19.1/go.mod h1:YD5h/ldMsG0XiIw7PdyNhLxaM317eFh5yNLccNfGdyw=
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.12.0 h1:wL5IEG5zb7BVv1Kv0Xm92orq+5hB5Nipn3B5tn4Rqfk=
@@ -561,20 +559,7 @@ github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2 h1:FwladfywkNirM+FZY
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.6.2/go.mod h1:vv5Ad0RrIoT1lJFdWBZwt4mB1+j+V8DUroixmKDTCdk=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2 h1:l3SabZmNuXCMCbQUIeR4W6/N4j8SeH/lwX+a6leZhHo=
github.com/Azure/azure-sdk-for-go/sdk/storage/azfile v1.5.2/go.mod h1:k+mEZ4f1pVqZTRqtSDW2AhZ/3wT5qLpsUA75C/k7dtE=
-github.com/Azure/azure-storage-blob-go v0.15.0 h1:rXtgp8tN1p29GvpGgfJetavIG0V7OgcSXPpwp3tx6qk=
-github.com/Azure/azure-storage-blob-go v0.15.0/go.mod h1:vbjsVbX0dlxnRc4FFMPsS9BsJWPcne7GB7onqlPvz58=
github.com/Azure/go-ansiterm v0.0.0-20170929234023-d6e3b3328b78/go.mod h1:LmzpDX56iTiv29bbRTIsUNlaFfuhWRQBWjQdVyAevI8=
-github.com/Azure/go-autorest v14.2.0+incompatible h1:V5VMDjClD3GiElqLWO7mz2MxNAK/vTfRHdAubSIPRgs=
-github.com/Azure/go-autorest v14.2.0+incompatible/go.mod h1:r+4oMnoxhatjLLJ6zxSWATqVooLgysK6ZNox3g/xq24=
-github.com/Azure/go-autorest/autorest/adal v0.9.13 h1:Mp5hbtOePIzM8pJVRa3YLrWWmZtoxRXqUEzCfJt3+/Q=
-github.com/Azure/go-autorest/autorest/adal v0.9.13/go.mod h1:W/MM4U6nLxnIskrw4UwWzlHfGjwUS50aOsc/I3yuU8M=
-github.com/Azure/go-autorest/autorest/date v0.3.0 h1:7gUk1U5M/CQbp9WoqinNzJar+8KY+LPI6wiWrP/myHw=
-github.com/Azure/go-autorest/autorest/date v0.3.0/go.mod h1:BI0uouVdmngYNUzGWeSYnokU+TrmwEsOqdt8Y6sso74=
-github.com/Azure/go-autorest/autorest/mocks v0.4.1/go.mod h1:LTp+uSrOhSkaKrUy935gNZuuIPPVsHlr9DSOxSayd+k=
-github.com/Azure/go-autorest/logger v0.2.1 h1:IG7i4p/mDa2Ce4TRyAO8IHnVhAVF3RFU+ZtXWSmf4Tg=
-github.com/Azure/go-autorest/logger v0.2.1/go.mod h1:T9E3cAhj2VqvPOtCYAvby9aBXkZmbF5NWuPV8+WeEW8=
-github.com/Azure/go-autorest/tracing v0.6.0 h1:TYi4+3m5t6K48TGI9AUdb+IzbnSxvnvUMfuitfgcfuo=
-github.com/Azure/go-autorest/tracing v0.6.0/go.mod h1:+vhtPC754Xsa23ID7GlGsrdKBpUA79WCAKPPZVC2DeU=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358 h1:mFRzDkZVAjdal+s7s0MwaRv9igoPqLRdzOLzw/8Xvq8=
github.com/Azure/go-ntlmssp v0.0.0-20221128193559-754e69321358/go.mod h1:chxPXzSsl7ZWRAuOIE23GDNzjWuZquvFlgA8xmpunjU=
github.com/AzureAD/microsoft-authentication-extensions-for-go/cache v0.1.1 h1:WJTmL004Abzc5wDB5VtZG2PJk5ndYDgVacGqfirKxjM=
@@ -929,8 +914,6 @@ github.com/flynn/noise v1.1.0 h1:KjPQoQCEFdZDiP03phOvGi11+SVVhBG2wOWAorLsstg=
github.com/flynn/noise v1.1.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/fogleman/gg v1.3.0/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
-github.com/form3tech-oss/jwt-go v3.2.2+incompatible h1:TcekIExNqud5crz4xD2pavyTgWiPvpYe4Xau31I0PRk=
-github.com/form3tech-oss/jwt-go v3.2.2+incompatible/go.mod h1:pbq4aXjuKjdthFRnoDwaVPLA+WlJuPGy+QneDUgJi2k=
github.com/fortytw2/leaktest v1.3.0 h1:u8491cBMTQ8ft8aeV+adlcytMZylmA5nnwwkRZjI8vw=
github.com/fortytw2/leaktest v1.3.0/go.mod h1:jDsjWgpAGjm2CA7WthBh/CdZYEPF31XHquHwclZch5g=
github.com/frankban/quicktest v1.14.6 h1:7Xjx+VpznH+oBnejlPUj8oUpdxnVs4f8XU8WnHkI4W8=
@@ -1398,9 +1381,6 @@ github.com/mattn/go-colorable v0.1.9/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope
github.com/mattn/go-colorable v0.1.12/go.mod h1:u5H1YNBxpqRaxsYJYSkiCWKzEfiAb1Gb520KVy5xxl4=
github.com/mattn/go-colorable v0.1.14 h1:9A9LHSqF/7dyVVX6g0U9cwm9pG3kP9gSzcuIPHPsaIE=
github.com/mattn/go-colorable v0.1.14/go.mod h1:6LmQG8QLFO4G5z1gPvYEzlUgJ2wF+stgPZH1UqBm1s8=
-github.com/mattn/go-ieproxy v0.0.1/go.mod h1:pYabZ6IHcRpFh7vIaLfK7rdcWgFEb3SFJ6/gNWuh88E=
-github.com/mattn/go-ieproxy v0.0.11 h1:MQ/5BuGSgDAHZOJe6YY80IF2UVCfGkwfo6AeD7HtHYo=
-github.com/mattn/go-ieproxy v0.0.11/go.mod h1:/NsJd+kxZBmjMc5hrJCKMbP57B84rvq9BiDRbtO9AS0=
github.com/mattn/go-isatty v0.0.3/go.mod h1:M+lRXTBqGeGNdLjl/ufCoiOlB5xdOkqRJdNxMWT7Zi4=
github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
@@ -1920,8 +1900,6 @@ golang.org/x/crypto v0.0.0-20190820162420-60c769a6c586/go.mod h1:yigFU9vqHzYiE8U
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200323165209-0ec3e9974c59/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/crypto v0.0.0-20201002170205-7f63de1d35b0/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
-golang.org/x/crypto v0.0.0-20201016220609-9e8e0b390897/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210421170649-83a5a9bb288b/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
@@ -2023,7 +2001,6 @@ golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLL
golang.org/x/net v0.0.0-20190628185345-da137c7871d7/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20190724013045-ca1201d0de80/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191003171128-d98b1b443823/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
-golang.org/x/net v0.0.0-20191112182307-2180aed22343/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20191209160850-c0dbc17a3553/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200202094626-16171245cfb2/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
@@ -2048,7 +2025,6 @@ golang.org/x/net v0.0.0-20210226172049-e18ecbb05110/go.mod h1:m0MpNAwzfU5UDzcl9v
golang.org/x/net v0.0.0-20210316092652-d523dce5a7f4/go.mod h1:RBQZq4jEuRlivfhVLdyRGr576XBO4/greRjx4P4O3yc=
golang.org/x/net v0.0.0-20210405180319-a5a99cb37ef4/go.mod h1:p54w0d4576C0XHj96bSt6lcn1PtDYWL6XObtHCRCNQM=
golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
-golang.org/x/net v0.0.0-20210610132358-84b48f89b13b/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210813160813-60bc85c4be6d/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
@@ -2152,7 +2128,6 @@ golang.org/x/sys v0.0.0-20190726091711-fc99dfbffb4e/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20190916202348-b4ddaad3f8a3/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191001151750-bb3f8db39f24/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
-golang.org/x/sys v0.0.0-20191112214154-59a1497f0cea/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20191228213918-04cbcbbfeed8/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200106162015-b016eb3dc98e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
diff --git a/test/s3/multipart/aws_upload.go b/test/s3/multipart/aws_upload.go
index 0553bd403..fbb1cb879 100644
--- a/test/s3/multipart/aws_upload.go
+++ b/test/s3/multipart/aws_upload.go
@@ -108,7 +108,6 @@ func main() {
fmt.Printf("part %d: %v\n", i, part)
}
-
completeResponse, err := completeMultipartUpload(svc, resp, completedParts)
if err != nil {
fmt.Println(err.Error())
diff --git a/unmaintained/change_superblock/change_superblock.go b/unmaintained/change_superblock/change_superblock.go
index 52368f8cd..a9bb1fe16 100644
--- a/unmaintained/change_superblock/change_superblock.go
+++ b/unmaintained/change_superblock/change_superblock.go
@@ -26,15 +26,15 @@ var (
This is to change replication factor in .dat file header. Need to shut down the volume servers
that has those volumes.
-1. fix the .dat file in place
- // just see the replication setting
- go run change_replication.go -volumeId=9 -dir=/Users/chrislu/Downloads
- Current Volume Replication: 000
- // fix the replication setting
- go run change_replication.go -volumeId=9 -dir=/Users/chrislu/Downloads -replication 001
- Current Volume Replication: 000
- Changing to: 001
- Done.
+ 1. fix the .dat file in place
+ // just see the replication setting
+ go run change_replication.go -volumeId=9 -dir=/Users/chrislu/Downloads
+ Current Volume Replication: 000
+ // fix the replication setting
+ go run change_replication.go -volumeId=9 -dir=/Users/chrislu/Downloads -replication 001
+ Current Volume Replication: 000
+ Changing to: 001
+ Done.
2. copy the fixed .dat and related .idx files to some remote server
3. restart volume servers or start new volume servers.
@@ -42,7 +42,7 @@ that has those volumes.
func main() {
flag.Parse()
util_http.NewGlobalHttpClient()
-
+
fileName := strconv.Itoa(*fixVolumeId)
if *fixVolumeCollection != "" {
fileName = *fixVolumeCollection + "_" + fileName
diff --git a/unmaintained/diff_volume_servers/diff_volume_servers.go b/unmaintained/diff_volume_servers/diff_volume_servers.go
index e289fefe8..b4ceeb58c 100644
--- a/unmaintained/diff_volume_servers/diff_volume_servers.go
+++ b/unmaintained/diff_volume_servers/diff_volume_servers.go
@@ -19,8 +19,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/storage/types"
"github.com/seaweedfs/seaweedfs/weed/util"
- "google.golang.org/grpc"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "google.golang.org/grpc"
)
var (
@@ -31,18 +31,18 @@ var (
)
/*
- Diff the volume's files across multiple volume servers.
- diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5
+Diff the volume's files across multiple volume servers.
+diff_volume_servers -volumeServers 127.0.0.1:8080,127.0.0.1:8081 -volumeId 5
- Example Output:
- reference 127.0.0.1:8081
- fileId volumeServer message
- 5,01617c3f61 127.0.0.1:8080 wrongSize
+Example Output:
+reference 127.0.0.1:8081
+fileId volumeServer message
+5,01617c3f61 127.0.0.1:8080 wrongSize
*/
func main() {
flag.Parse()
util_http.InitGlobalHttpClient()
-
+
util.LoadSecurityConfiguration()
grpcDialOption = security.LoadClientTLS(util.GetViper(), "grpc.client")
diff --git a/unmaintained/fix_dat/fix_dat.go b/unmaintained/fix_dat/fix_dat.go
index 164b5b238..5f1ea1375 100644
--- a/unmaintained/fix_dat/fix_dat.go
+++ b/unmaintained/fix_dat/fix_dat.go
@@ -28,12 +28,12 @@ This is to resolve an one-time issue that caused inconsistency with .dat and .id
In this case, the .dat file contains all data, but some deletion caused incorrect offset.
The .idx has all correct offsets.
-1. fix the .dat file, a new .dat_fixed file will be generated.
- go run fix_dat.go -volumeId=9 -dir=/Users/chrislu/Downloads
-2. move the original .dat and .idx files to some backup folder, and rename .dat_fixed to .dat file
+ 1. fix the .dat file, a new .dat_fixed file will be generated.
+ go run fix_dat.go -volumeId=9 -dir=/Users/chrislu/Downloads
+ 2. move the original .dat and .idx files to some backup folder, and rename .dat_fixed to .dat file
mv 9.dat_fixed 9.dat
-3. fix the .idx file with the "weed fix"
- weed fix -volumeId=9 -dir=/Users/chrislu/Downloads
+ 3. fix the .idx file with the "weed fix"
+ weed fix -volumeId=9 -dir=/Users/chrislu/Downloads
*/
func main() {
flag.Parse()
diff --git a/unmaintained/s3/presigned_put/presigned_put.go b/unmaintained/s3/presigned_put/presigned_put.go
index 1e591dff2..46e4cbf06 100644
--- a/unmaintained/s3/presigned_put/presigned_put.go
+++ b/unmaintained/s3/presigned_put/presigned_put.go
@@ -7,22 +7,25 @@ import (
"github.com/aws/aws-sdk-go/aws"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/service/s3"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"net/http"
"strings"
"time"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
// Downloads an item from an S3 Bucket in the region configured in the shared config
// or AWS_REGION environment variable.
//
// Usage:
-// go run presigned_put.go
+//
+// go run presigned_put.go
+//
// For this exampl to work, the domainName is needd
-// weed s3 -domainName=localhost
+//
+// weed s3 -domainName=localhost
func main() {
util_http.InitGlobalHttpClient()
-
+
h := md5.New()
content := strings.NewReader(stringContent)
content.WriteTo(h)
diff --git a/unmaintained/stream_read_volume/stream_read_volume.go b/unmaintained/stream_read_volume/stream_read_volume.go
index cfdb36815..b148e4a4a 100644
--- a/unmaintained/stream_read_volume/stream_read_volume.go
+++ b/unmaintained/stream_read_volume/stream_read_volume.go
@@ -12,8 +12,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/pb/volume_server_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/util"
- "google.golang.org/grpc"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "google.golang.org/grpc"
)
var (
diff --git a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
index 6dc703dbc..a98da1d01 100644
--- a/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
+++ b/unmaintained/stress_filer_upload/bench_filer_upload/bench_filer_upload.go
@@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"io"
"log"
"math/rand"
@@ -13,7 +14,6 @@ import (
"strings"
"sync"
"time"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
diff --git a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
index 1cdcad0b3..1c3befe3d 100644
--- a/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
+++ b/unmaintained/stress_filer_upload/stress_filer_upload_actual/stress_filer_upload.go
@@ -4,6 +4,7 @@ import (
"bytes"
"flag"
"fmt"
+ util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
"io"
"log"
"math/rand"
@@ -14,7 +15,6 @@ import (
"strings"
"sync"
"time"
- util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
var (
diff --git a/unmaintained/volume_tailer/volume_tailer.go b/unmaintained/volume_tailer/volume_tailer.go
index a75a095d4..03f728ad0 100644
--- a/unmaintained/volume_tailer/volume_tailer.go
+++ b/unmaintained/volume_tailer/volume_tailer.go
@@ -1,18 +1,18 @@
package main
import (
+ "context"
"flag"
"github.com/seaweedfs/seaweedfs/weed/pb"
"log"
"time"
- "context"
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/security"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
util2 "github.com/seaweedfs/seaweedfs/weed/util"
- "golang.org/x/tools/godoc/util"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
+ "golang.org/x/tools/godoc/util"
)
var (
diff --git a/weed/filer/redis2/redis_store.go b/weed/filer/redis2/redis_store.go
index 5e7bc019e..f9322be42 100644
--- a/weed/filer/redis2/redis_store.go
+++ b/weed/filer/redis2/redis_store.go
@@ -61,14 +61,14 @@ func (store *Redis2Store) initialize(hostPort string, password string, database
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{clientCert},
- RootCAs: caCertPool,
- ServerName: redisHost,
- MinVersion: tls.VersionTLS12,
+ RootCAs: caCertPool,
+ ServerName: redisHost,
+ MinVersion: tls.VersionTLS12,
}
store.Client = redis.NewClient(&redis.Options{
- Addr: hostPort,
- Password: password,
- DB: database,
+ Addr: hostPort,
+ Password: password,
+ DB: database,
TLSConfig: tlsConfig,
})
} else {
diff --git a/weed/mount/filehandle.go b/weed/mount/filehandle.go
index d3836754f..c20f9eca8 100644
--- a/weed/mount/filehandle.go
+++ b/weed/mount/filehandle.go
@@ -89,23 +89,23 @@ func (fh *FileHandle) SetEntry(entry *filer_pb.Entry) {
glog.Fatalf("setting file handle entry to nil")
}
fh.entry.SetEntry(entry)
-
+
// Invalidate chunk offset cache since chunks may have changed
fh.invalidateChunkCache()
}
func (fh *FileHandle) UpdateEntry(fn func(entry *filer_pb.Entry)) *filer_pb.Entry {
result := fh.entry.UpdateEntry(fn)
-
+
// Invalidate chunk offset cache since entry may have been modified
fh.invalidateChunkCache()
-
+
return result
}
func (fh *FileHandle) AddChunks(chunks []*filer_pb.FileChunk) {
fh.entry.AppendChunks(chunks)
-
+
// Invalidate chunk offset cache since new chunks were added
fh.invalidateChunkCache()
}
diff --git a/weed/mq/broker/broker_grpc_pub.go b/weed/mq/broker/broker_grpc_pub.go
index 3521a0df2..18f6df8a0 100644
--- a/weed/mq/broker/broker_grpc_pub.go
+++ b/weed/mq/broker/broker_grpc_pub.go
@@ -183,4 +183,3 @@ func findClientAddress(ctx context.Context) string {
}
return pr.Addr.String()
}
-
diff --git a/weed/query/engine/arithmetic_functions.go b/weed/query/engine/arithmetic_functions.go
index fd8ac1684..e2237e31b 100644
--- a/weed/query/engine/arithmetic_functions.go
+++ b/weed/query/engine/arithmetic_functions.go
@@ -15,11 +15,11 @@ import (
type ArithmeticOperator string
const (
- OpAdd ArithmeticOperator = "+"
- OpSub ArithmeticOperator = "-"
- OpMul ArithmeticOperator = "*"
- OpDiv ArithmeticOperator = "/"
- OpMod ArithmeticOperator = "%"
+ OpAdd ArithmeticOperator = "+"
+ OpSub ArithmeticOperator = "-"
+ OpMul ArithmeticOperator = "*"
+ OpDiv ArithmeticOperator = "/"
+ OpMod ArithmeticOperator = "%"
)
// EvaluateArithmeticExpression evaluates basic arithmetic operations between two values
@@ -69,7 +69,7 @@ func (e *SQLEngine) EvaluateArithmeticExpression(left, right *schema_pb.Value, o
// Convert result back to appropriate schema value type
// If both operands were integers and operation doesn't produce decimal, return integer
- if e.isIntegerValue(left) && e.isIntegerValue(right) &&
+ if e.isIntegerValue(left) && e.isIntegerValue(right) &&
(operator == OpAdd || operator == OpSub || operator == OpMul || operator == OpMod) {
return &schema_pb.Value{
Kind: &schema_pb.Value_Int64Value{Int64Value: int64(result)},
diff --git a/weed/query/engine/arithmetic_functions_test.go b/weed/query/engine/arithmetic_functions_test.go
index 8c5e11dec..f07ada54f 100644
--- a/weed/query/engine/arithmetic_functions_test.go
+++ b/weed/query/engine/arithmetic_functions_test.go
@@ -10,131 +10,131 @@ func TestArithmeticOperations(t *testing.T) {
engine := NewTestSQLEngine()
tests := []struct {
- name string
- left *schema_pb.Value
- right *schema_pb.Value
- operator ArithmeticOperator
- expected *schema_pb.Value
- expectErr bool
+ name string
+ left *schema_pb.Value
+ right *schema_pb.Value
+ operator ArithmeticOperator
+ expected *schema_pb.Value
+ expectErr bool
}{
// Addition tests
{
- name: "Add two integers",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- operator: OpAdd,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 15}},
+ name: "Add two integers",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ operator: OpAdd,
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 15}},
expectErr: false,
},
{
- name: "Add integer and float",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 5.5}},
- operator: OpAdd,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 15.5}},
+ name: "Add integer and float",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 5.5}},
+ operator: OpAdd,
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 15.5}},
expectErr: false,
},
// Subtraction tests
{
- name: "Subtract two integers",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 3}},
- operator: OpSub,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 7}},
+ name: "Subtract two integers",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 3}},
+ operator: OpSub,
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 7}},
expectErr: false,
},
// Multiplication tests
{
- name: "Multiply two integers",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 6}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 7}},
- operator: OpMul,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 42}},
+ name: "Multiply two integers",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 6}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 7}},
+ operator: OpMul,
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 42}},
expectErr: false,
},
{
- name: "Multiply with float",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}},
- operator: OpMul,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 12.5}},
+ name: "Multiply with float",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}},
+ operator: OpMul,
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 12.5}},
expectErr: false,
},
// Division tests
{
- name: "Divide two integers",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 20}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 4}},
- operator: OpDiv,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 5.0}},
+ name: "Divide two integers",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 20}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 4}},
+ operator: OpDiv,
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 5.0}},
expectErr: false,
},
{
- name: "Division by zero",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
- operator: OpDiv,
- expected: nil,
+ name: "Division by zero",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
+ operator: OpDiv,
+ expected: nil,
expectErr: true,
},
// Modulo tests
{
- name: "Modulo operation",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 17}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- operator: OpMod,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 2}},
+ name: "Modulo operation",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 17}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ operator: OpMod,
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 2}},
expectErr: false,
},
{
- name: "Modulo by zero",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
- operator: OpMod,
- expected: nil,
+ name: "Modulo by zero",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 10}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
+ operator: OpMod,
+ expected: nil,
expectErr: true,
},
// String conversion tests
{
- name: "Add string number to integer",
- left: &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "15"}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- operator: OpAdd,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 20.0}},
+ name: "Add string number to integer",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "15"}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ operator: OpAdd,
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 20.0}},
expectErr: false,
},
{
- name: "Invalid string conversion",
- left: &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "not_a_number"}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- operator: OpAdd,
- expected: nil,
+ name: "Invalid string conversion",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_StringValue{StringValue: "not_a_number"}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ operator: OpAdd,
+ expected: nil,
expectErr: true,
},
// Boolean conversion tests
{
- name: "Add boolean to integer",
- left: &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: true}},
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- operator: OpAdd,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 6.0}},
+ name: "Add boolean to integer",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_BoolValue{BoolValue: true}},
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ operator: OpAdd,
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 6.0}},
expectErr: false,
},
// Null value tests
{
- name: "Add with null left operand",
- left: nil,
- right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- operator: OpAdd,
- expected: nil,
+ name: "Add with null left operand",
+ left: nil,
+ right: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ operator: OpAdd,
+ expected: nil,
expectErr: true,
},
{
- name: "Add with null right operand",
- left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- right: nil,
- operator: OpAdd,
- expected: nil,
+ name: "Add with null right operand",
+ left: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ right: nil,
+ operator: OpAdd,
+ expected: nil,
expectErr: true,
},
}
@@ -203,7 +203,7 @@ func TestIndividualArithmeticFunctions(t *testing.T) {
if err != nil {
t.Errorf("Divide function failed: %v", err)
}
- expected = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 10.0/3.0}}
+ expected = &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 10.0 / 3.0}}
if !valuesEqual(result, expected) {
t.Errorf("Divide: Expected %v, got %v", expected, result)
}
@@ -224,45 +224,45 @@ func TestMathematicalFunctions(t *testing.T) {
t.Run("ROUND function tests", func(t *testing.T) {
tests := []struct {
- name string
- value *schema_pb.Value
- precision *schema_pb.Value
- expected *schema_pb.Value
- expectErr bool
+ name string
+ value *schema_pb.Value
+ precision *schema_pb.Value
+ expected *schema_pb.Value
+ expectErr bool
}{
{
- name: "Round float to integer",
- value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.7}},
+ name: "Round float to integer",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.7}},
precision: nil,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 4.0}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 4.0}},
expectErr: false,
},
{
- name: "Round integer stays integer",
- value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ name: "Round integer stays integer",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
precision: nil,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
expectErr: false,
},
{
- name: "Round with precision 2",
- value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14159}},
+ name: "Round with precision 2",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14159}},
precision: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 2}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}},
expectErr: false,
},
{
- name: "Round negative number",
- value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.7}},
+ name: "Round negative number",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.7}},
precision: nil,
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -4.0}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -4.0}},
expectErr: false,
},
{
- name: "Round null value",
- value: nil,
+ name: "Round null value",
+ value: nil,
precision: nil,
- expected: nil,
+ expected: nil,
expectErr: true,
},
}
@@ -299,33 +299,33 @@ func TestMathematicalFunctions(t *testing.T) {
t.Run("CEIL function tests", func(t *testing.T) {
tests := []struct {
- name string
- value *schema_pb.Value
- expected *schema_pb.Value
- expectErr bool
+ name string
+ value *schema_pb.Value
+ expected *schema_pb.Value
+ expectErr bool
}{
{
- name: "Ceil positive decimal",
- value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.2}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 4}},
+ name: "Ceil positive decimal",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.2}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 4}},
expectErr: false,
},
{
- name: "Ceil negative decimal",
- value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.2}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -3}},
+ name: "Ceil negative decimal",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.2}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -3}},
expectErr: false,
},
{
- name: "Ceil integer",
- value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ name: "Ceil integer",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
expectErr: false,
},
{
- name: "Ceil null value",
- value: nil,
- expected: nil,
+ name: "Ceil null value",
+ value: nil,
+ expected: nil,
expectErr: true,
},
}
@@ -355,33 +355,33 @@ func TestMathematicalFunctions(t *testing.T) {
t.Run("FLOOR function tests", func(t *testing.T) {
tests := []struct {
- name string
- value *schema_pb.Value
- expected *schema_pb.Value
- expectErr bool
+ name string
+ value *schema_pb.Value
+ expected *schema_pb.Value
+ expectErr bool
}{
{
- name: "Floor positive decimal",
- value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.8}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 3}},
+ name: "Floor positive decimal",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.8}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 3}},
expectErr: false,
},
{
- name: "Floor negative decimal",
- value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.2}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -4}},
+ name: "Floor negative decimal",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.2}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -4}},
expectErr: false,
},
{
- name: "Floor integer",
- value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ name: "Floor integer",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
expectErr: false,
},
{
- name: "Floor null value",
- value: nil,
- expected: nil,
+ name: "Floor null value",
+ value: nil,
+ expected: nil,
expectErr: true,
},
}
@@ -411,57 +411,57 @@ func TestMathematicalFunctions(t *testing.T) {
t.Run("ABS function tests", func(t *testing.T) {
tests := []struct {
- name string
- value *schema_pb.Value
- expected *schema_pb.Value
- expectErr bool
+ name string
+ value *schema_pb.Value
+ expected *schema_pb.Value
+ expectErr bool
}{
{
- name: "Abs positive integer",
- value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ name: "Abs positive integer",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
expectErr: false,
},
{
- name: "Abs negative integer",
- value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -5}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
+ name: "Abs negative integer",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: -5}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 5}},
expectErr: false,
},
{
- name: "Abs positive double",
- value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}},
+ name: "Abs positive double",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}},
expectErr: false,
},
{
- name: "Abs negative double",
- value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.14}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}},
+ name: "Abs negative double",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: -3.14}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_DoubleValue{DoubleValue: 3.14}},
expectErr: false,
},
{
- name: "Abs positive float",
- value: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}},
+ name: "Abs positive float",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}},
expectErr: false,
},
{
- name: "Abs negative float",
- value: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: -2.5}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}},
+ name: "Abs negative float",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: -2.5}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_FloatValue{FloatValue: 2.5}},
expectErr: false,
},
{
- name: "Abs zero",
- value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
- expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
+ name: "Abs zero",
+ value: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
+ expected: &schema_pb.Value{Kind: &schema_pb.Value_Int64Value{Int64Value: 0}},
expectErr: false,
},
{
- name: "Abs null value",
- value: nil,
- expected: nil,
+ name: "Abs null value",
+ value: nil,
+ expected: nil,
expectErr: true,
},
}
diff --git a/weed/query/engine/datetime_functions.go b/weed/query/engine/datetime_functions.go
index 2ece58e15..9803145f0 100644
--- a/weed/query/engine/datetime_functions.go
+++ b/weed/query/engine/datetime_functions.go
@@ -16,7 +16,7 @@ import (
func (e *SQLEngine) CurrentDate() (*schema_pb.Value, error) {
now := time.Now()
dateStr := now.Format("2006-01-02")
-
+
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: dateStr},
}, nil
@@ -25,10 +25,10 @@ func (e *SQLEngine) CurrentDate() (*schema_pb.Value, error) {
// CurrentTimestamp returns the current timestamp
func (e *SQLEngine) CurrentTimestamp() (*schema_pb.Value, error) {
now := time.Now()
-
+
// Return as TimestampValue with microseconds
timestampMicros := now.UnixMicro()
-
+
return &schema_pb.Value{
Kind: &schema_pb.Value_TimestampValue{
TimestampValue: &schema_pb.TimestampValue{
@@ -42,7 +42,7 @@ func (e *SQLEngine) CurrentTimestamp() (*schema_pb.Value, error) {
func (e *SQLEngine) CurrentTime() (*schema_pb.Value, error) {
now := time.Now()
timeStr := now.Format("15:04:05")
-
+
return &schema_pb.Value{
Kind: &schema_pb.Value_StringValue{StringValue: timeStr},
}, nil
@@ -61,13 +61,13 @@ func (e *SQLEngine) Now() (*schema_pb.Value, error) {
type DatePart string
const (
- PartYear DatePart = "YEAR"
- PartMonth DatePart = "MONTH"
- PartDay DatePart = "DAY"
- PartHour DatePart = "HOUR"
- PartMinute DatePart = "MINUTE"
- PartSecond DatePart = "SECOND"
- PartWeek DatePart = "WEEK"
+ PartYear DatePart = "YEAR"
+ PartMonth DatePart = "MONTH"
+ PartDay DatePart = "DAY"
+ PartHour DatePart = "HOUR"
+ PartMinute DatePart = "MINUTE"
+ PartSecond DatePart = "SECOND"
+ PartWeek DatePart = "WEEK"
PartDayOfYear DatePart = "DOY"
PartDayOfWeek DatePart = "DOW"
PartQuarter DatePart = "QUARTER"
@@ -172,7 +172,7 @@ func (e *SQLEngine) DateTrunc(precision string, value *schema_pb.Value) (*schema
case "year", "years":
truncated = time.Date(t.Year(), 1, 1, 0, 0, 0, 0, t.Location())
case "decade", "decades":
- year := (t.Year()/10) * 10
+ year := (t.Year() / 10) * 10
truncated = time.Date(year, 1, 1, 0, 0, 0, 0, t.Location())
case "century", "centuries":
year := ((t.Year()-1)/100)*100 + 1
diff --git a/weed/remote_storage/azure/azure_highlevel.go b/weed/remote_storage/azure/azure_highlevel.go
deleted file mode 100644
index a5cd4070b..000000000
--- a/weed/remote_storage/azure/azure_highlevel.go
+++ /dev/null
@@ -1,120 +0,0 @@
-package azure
-
-import (
- "context"
- "crypto/rand"
- "encoding/base64"
- "errors"
- "fmt"
- "github.com/Azure/azure-pipeline-go/pipeline"
- . "github.com/Azure/azure-storage-blob-go/azblob"
- "io"
- "sync"
-)
-
-// copied from https://github.com/Azure/azure-storage-blob-go/blob/master/azblob/highlevel.go#L73:6
-// uploadReaderAtToBlockBlob was not public
-
-// uploadReaderAtToBlockBlob uploads a buffer in blocks to a block blob.
-func uploadReaderAtToBlockBlob(ctx context.Context, reader io.ReaderAt, readerSize int64,
- blockBlobURL BlockBlobURL, o UploadToBlockBlobOptions) (CommonResponse, error) {
- if o.BlockSize == 0 {
- // If bufferSize > (BlockBlobMaxStageBlockBytes * BlockBlobMaxBlocks), then error
- if readerSize > BlockBlobMaxStageBlockBytes*BlockBlobMaxBlocks {
- return nil, errors.New("buffer is too large to upload to a block blob")
- }
- // If bufferSize <= BlockBlobMaxUploadBlobBytes, then Upload should be used with just 1 I/O request
- if readerSize <= BlockBlobMaxUploadBlobBytes {
- o.BlockSize = BlockBlobMaxUploadBlobBytes // Default if unspecified
- } else {
- o.BlockSize = readerSize / BlockBlobMaxBlocks // buffer / max blocks = block size to use all 50,000 blocks
- if o.BlockSize < BlobDefaultDownloadBlockSize { // If the block size is smaller than 4MB, round up to 4MB
- o.BlockSize = BlobDefaultDownloadBlockSize
- }
- // StageBlock will be called with blockSize blocks and a Parallelism of (BufferSize / BlockSize).
- }
- }
-
- if readerSize <= BlockBlobMaxUploadBlobBytes {
- // If the size can fit in 1 Upload call, do it this way
- var body io.ReadSeeker = io.NewSectionReader(reader, 0, readerSize)
- if o.Progress != nil {
- body = pipeline.NewRequestBodyProgress(body, o.Progress)
- }
- return blockBlobURL.Upload(ctx, body, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions, o.ImmutabilityPolicyOptions)
- }
-
- var numBlocks = uint16(((readerSize - 1) / o.BlockSize) + 1)
-
- blockIDList := make([]string, numBlocks) // Base-64 encoded block IDs
- progress := int64(0)
- progressLock := &sync.Mutex{}
-
- err := DoBatchTransfer(ctx, BatchTransferOptions{
- OperationName: "uploadReaderAtToBlockBlob",
- TransferSize: readerSize,
- ChunkSize: o.BlockSize,
- Parallelism: o.Parallelism,
- Operation: func(offset int64, count int64, ctx context.Context) error {
- // This function is called once per block.
- // It is passed this block's offset within the buffer and its count of bytes
- // Prepare to read the proper block/section of the buffer
- var body io.ReadSeeker = io.NewSectionReader(reader, offset, count)
- blockNum := offset / o.BlockSize
- if o.Progress != nil {
- blockProgress := int64(0)
- body = pipeline.NewRequestBodyProgress(body,
- func(bytesTransferred int64) {
- diff := bytesTransferred - blockProgress
- blockProgress = bytesTransferred
- progressLock.Lock() // 1 goroutine at a time gets a progress report
- progress += diff
- o.Progress(progress)
- progressLock.Unlock()
- })
- }
-
- // Block IDs are unique values to avoid issue if 2+ clients are uploading blocks
- // at the same time causing PutBlockList to get a mix of blocks from all the clients.
- blockIDList[blockNum] = base64.StdEncoding.EncodeToString(newUUID().bytes())
- _, err := blockBlobURL.StageBlock(ctx, blockIDList[blockNum], body, o.AccessConditions.LeaseAccessConditions, nil, o.ClientProvidedKeyOptions)
- return err
- },
- })
- if err != nil {
- return nil, err
- }
- // All put blocks were successful, call Put Block List to finalize the blob
- return blockBlobURL.CommitBlockList(ctx, blockIDList, o.BlobHTTPHeaders, o.Metadata, o.AccessConditions, o.BlobAccessTier, o.BlobTagsMap, o.ClientProvidedKeyOptions, o.ImmutabilityPolicyOptions)
-}
-
-// The UUID reserved variants.
-const (
- reservedNCS byte = 0x80
- reservedRFC4122 byte = 0x40
- reservedMicrosoft byte = 0x20
- reservedFuture byte = 0x00
-)
-
-type uuid [16]byte
-
-// NewUUID returns a new uuid using RFC 4122 algorithm.
-func newUUID() (u uuid) {
- u = uuid{}
- // Set all bits to randomly (or pseudo-randomly) chosen values.
- rand.Read(u[:])
- u[8] = (u[8] | reservedRFC4122) & 0x7F // u.setVariant(ReservedRFC4122)
-
- var version byte = 4
- u[6] = (u[6] & 0xF) | (version << 4) // u.setVersion(4)
- return
-}
-
-// String returns an unparsed version of the generated UUID sequence.
-func (u uuid) String() string {
- return fmt.Sprintf("%x-%x-%x-%x-%x", u[0:4], u[4:6], u[6:8], u[8:10], u[10:])
-}
-
-func (u uuid) bytes() []byte {
- return u[:]
-}
diff --git a/weed/remote_storage/azure/azure_storage_client.go b/weed/remote_storage/azure/azure_storage_client.go
index 8183c77a4..bfedd68e2 100644
--- a/weed/remote_storage/azure/azure_storage_client.go
+++ b/weed/remote_storage/azure/azure_storage_client.go
@@ -3,21 +3,58 @@ package azure
import (
"context"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"io"
- "net/url"
"os"
"reflect"
+ "regexp"
"strings"
-
- "github.com/Azure/azure-storage-blob-go/azblob"
- "github.com/seaweedfs/seaweedfs/weed/filer"
+ "time"
+
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore"
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blockblob"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
"github.com/seaweedfs/seaweedfs/weed/remote_storage"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
"github.com/seaweedfs/seaweedfs/weed/util"
)
+const (
+ defaultBlockSize = 4 * 1024 * 1024
+ defaultConcurrency = 16
+)
+
+// invalidMetadataChars matches any character that is not valid in Azure metadata keys.
+// Azure metadata keys must be valid C# identifiers: letters, digits, and underscores only.
+var invalidMetadataChars = regexp.MustCompile(`[^a-zA-Z0-9_]`)
+
+// sanitizeMetadataKey converts an S3 metadata key to a valid Azure metadata key.
+// Azure metadata keys must be valid C# identifiers (letters, digits, underscores only, cannot start with digit).
+// To prevent collisions, invalid characters are replaced with their hex representation (_XX_).
+// Examples:
+// - "my-key" -> "my_2d_key"
+// - "my.key" -> "my_2e_key"
+// - "key@value" -> "key_40_value"
+func sanitizeMetadataKey(key string) string {
+ // Replace each invalid character with _XX_ where XX is the hex code
+ result := invalidMetadataChars.ReplaceAllStringFunc(key, func(s string) string {
+ return fmt.Sprintf("_%02x_", s[0])
+ })
+
+ // Azure metadata keys cannot start with a digit
+ if len(result) > 0 && result[0] >= '0' && result[0] <= '9' {
+ result = "_" + result
+ }
+
+ return result
+}
+
func init() {
remote_storage.RemoteStorageClientMakers["azure"] = new(azureRemoteStorageMaker)
}
@@ -42,25 +79,35 @@ func (s azureRemoteStorageMaker) Make(conf *remote_pb.RemoteConf) (remote_storag
}
}
- // Use your Storage account's name and key to create a credential object.
+ // Create credential and client
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
- return nil, fmt.Errorf("invalid Azure credential with account name:%s: %v", accountName, err)
+ return nil, fmt.Errorf("invalid Azure credential with account name:%s: %w", accountName, err)
}
- // Create a request pipeline that is used to process HTTP(S) requests and responses.
- p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
+ serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
+ azClient, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{
+ ClientOptions: azcore.ClientOptions{
+ Retry: policy.RetryOptions{
+ MaxRetries: 10, // Increased from default 3 to maintain resiliency similar to old SDK's 20
+ TryTimeout: time.Minute,
+ RetryDelay: 2 * time.Second,
+ MaxRetryDelay: time.Minute,
+ },
+ },
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to create Azure client: %w", err)
+ }
- // Create an ServiceURL object that wraps the service URL and a request pipeline.
- u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
- client.serviceURL = azblob.NewServiceURL(*u, p)
+ client.client = azClient
return client, nil
}
type azureRemoteStorageClient struct {
- conf *remote_pb.RemoteConf
- serviceURL azblob.ServiceURL
+ conf *remote_pb.RemoteConf
+ client *azblob.Client
}
var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
@@ -68,59 +115,74 @@ var _ = remote_storage.RemoteStorageClient(&azureRemoteStorageClient{})
func (az *azureRemoteStorageClient) Traverse(loc *remote_pb.RemoteStorageLocation, visitFn remote_storage.VisitFunc) (err error) {
pathKey := loc.Path[1:]
- containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
-
- // List the container that we have created above
- for marker := (azblob.Marker{}); marker.NotDone(); {
- // Get a result segment starting with the blob indicated by the current Marker.
- listBlob, err := containerURL.ListBlobsFlatSegment(context.Background(), marker, azblob.ListBlobsSegmentOptions{
- Prefix: pathKey,
- })
+ containerClient := az.client.ServiceClient().NewContainerClient(loc.Bucket)
+
+ // List blobs with pager
+ pager := containerClient.NewListBlobsFlatPager(&container.ListBlobsFlatOptions{
+ Prefix: &pathKey,
+ })
+
+ for pager.More() {
+ resp, err := pager.NextPage(context.Background())
if err != nil {
- return fmt.Errorf("azure traverse %s%s: %v", loc.Bucket, loc.Path, err)
+ return fmt.Errorf("azure traverse %s%s: %w", loc.Bucket, loc.Path, err)
}
- // ListBlobs returns the start of the next segment; you MUST use this to get
- // the next segment (after processing the current result segment).
- marker = listBlob.NextMarker
-
- // Process the blobs returned in this result segment (if the segment is empty, the loop body won't execute)
- for _, blobInfo := range listBlob.Segment.BlobItems {
- key := blobInfo.Name
- key = "/" + key
+ for _, blobItem := range resp.Segment.BlobItems {
+ if blobItem.Name == nil {
+ continue
+ }
+ key := "/" + *blobItem.Name
dir, name := util.FullPath(key).DirAndName()
- err = visitFn(dir, name, false, &filer_pb.RemoteEntry{
- RemoteMtime: blobInfo.Properties.LastModified.Unix(),
- RemoteSize: *blobInfo.Properties.ContentLength,
- RemoteETag: string(blobInfo.Properties.Etag),
+
+ remoteEntry := &filer_pb.RemoteEntry{
StorageName: az.conf.Name,
- })
+ }
+ if blobItem.Properties != nil {
+ if blobItem.Properties.LastModified != nil {
+ remoteEntry.RemoteMtime = blobItem.Properties.LastModified.Unix()
+ }
+ if blobItem.Properties.ContentLength != nil {
+ remoteEntry.RemoteSize = *blobItem.Properties.ContentLength
+ }
+ if blobItem.Properties.ETag != nil {
+ remoteEntry.RemoteETag = string(*blobItem.Properties.ETag)
+ }
+ }
+
+ err = visitFn(dir, name, false, remoteEntry)
if err != nil {
- return fmt.Errorf("azure processing %s%s: %v", loc.Bucket, loc.Path, err)
+ return fmt.Errorf("azure processing %s%s: %w", loc.Bucket, loc.Path, err)
}
}
}
return
}
+
func (az *azureRemoteStorageClient) ReadFile(loc *remote_pb.RemoteStorageLocation, offset int64, size int64) (data []byte, err error) {
key := loc.Path[1:]
- containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
- blobURL := containerURL.NewBlockBlobURL(key)
+ blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key)
- downloadResponse, readErr := blobURL.Download(context.Background(), offset, size, azblob.BlobAccessConditions{}, false, azblob.ClientProvidedKeyOptions{})
- if readErr != nil {
- return nil, readErr
+ count := size
+ if count == 0 {
+ count = blob.CountToEnd
}
- // NOTE: automatically retries are performed if the connection fails
- bodyStream := downloadResponse.Body(azblob.RetryReaderOptions{MaxRetryRequests: 20})
- defer bodyStream.Close()
-
- data, err = io.ReadAll(bodyStream)
+ downloadResp, err := blobClient.DownloadStream(context.Background(), &blob.DownloadStreamOptions{
+ Range: blob.HTTPRange{
+ Offset: offset,
+ Count: count,
+ },
+ })
+ if err != nil {
+ return nil, fmt.Errorf("failed to download file %s%s: %w", loc.Bucket, loc.Path, err)
+ }
+ defer downloadResp.Body.Close()
+ data, err = io.ReadAll(downloadResp.Body)
if err != nil {
- return nil, fmt.Errorf("failed to download file %s%s: %v", loc.Bucket, loc.Path, err)
+ return nil, fmt.Errorf("failed to read download stream %s%s: %w", loc.Bucket, loc.Path, err)
}
return
@@ -137,23 +199,23 @@ func (az *azureRemoteStorageClient) RemoveDirectory(loc *remote_pb.RemoteStorage
func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocation, entry *filer_pb.Entry, reader io.Reader) (remoteEntry *filer_pb.RemoteEntry, err error) {
key := loc.Path[1:]
- containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
- blobURL := containerURL.NewBlockBlobURL(key)
+ blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key)
- readerAt, ok := reader.(io.ReaderAt)
- if !ok {
- return nil, fmt.Errorf("unexpected reader: readerAt expected")
+ // Upload from reader
+ metadata := toMetadata(entry.Extended)
+ httpHeaders := &blob.HTTPHeaders{}
+ if entry.Attributes != nil && entry.Attributes.Mime != "" {
+ httpHeaders.BlobContentType = &entry.Attributes.Mime
}
- fileSize := int64(filer.FileSize(entry))
- _, err = uploadReaderAtToBlockBlob(context.Background(), readerAt, fileSize, blobURL, azblob.UploadToBlockBlobOptions{
- BlockSize: 4 * 1024 * 1024,
- BlobHTTPHeaders: azblob.BlobHTTPHeaders{ContentType: entry.Attributes.Mime},
- Metadata: toMetadata(entry.Extended),
- Parallelism: 16,
+ _, err = blobClient.UploadStream(context.Background(), reader, &blockblob.UploadStreamOptions{
+ BlockSize: defaultBlockSize,
+ Concurrency: defaultConcurrency,
+ HTTPHeaders: httpHeaders,
+ Metadata: metadata,
})
if err != nil {
- return nil, fmt.Errorf("azure upload to %s%s: %v", loc.Bucket, loc.Path, err)
+ return nil, fmt.Errorf("azure upload to %s%s: %w", loc.Bucket, loc.Path, err)
}
// read back the remote entry
@@ -162,36 +224,45 @@ func (az *azureRemoteStorageClient) WriteFile(loc *remote_pb.RemoteStorageLocati
func (az *azureRemoteStorageClient) readFileRemoteEntry(loc *remote_pb.RemoteStorageLocation) (*filer_pb.RemoteEntry, error) {
key := loc.Path[1:]
- containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
- blobURL := containerURL.NewBlockBlobURL(key)
-
- attr, err := blobURL.GetProperties(context.Background(), azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
+ blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlockBlobClient(key)
+ props, err := blobClient.GetProperties(context.Background(), nil)
if err != nil {
return nil, err
}
- return &filer_pb.RemoteEntry{
- RemoteMtime: attr.LastModified().Unix(),
- RemoteSize: attr.ContentLength(),
- RemoteETag: string(attr.ETag()),
+ remoteEntry := &filer_pb.RemoteEntry{
StorageName: az.conf.Name,
- }, nil
+ }
+
+ if props.LastModified != nil {
+ remoteEntry.RemoteMtime = props.LastModified.Unix()
+ }
+ if props.ContentLength != nil {
+ remoteEntry.RemoteSize = *props.ContentLength
+ }
+ if props.ETag != nil {
+ remoteEntry.RemoteETag = string(*props.ETag)
+ }
+ return remoteEntry, nil
}
-func toMetadata(attributes map[string][]byte) map[string]string {
- metadata := make(map[string]string)
+func toMetadata(attributes map[string][]byte) map[string]*string {
+ metadata := make(map[string]*string)
for k, v := range attributes {
if strings.HasPrefix(k, s3_constants.AmzUserMetaPrefix) {
- metadata[k[len(s3_constants.AmzUserMetaPrefix):]] = string(v)
+ // S3 stores metadata keys in lowercase; normalize for consistency.
+ key := strings.ToLower(k[len(s3_constants.AmzUserMetaPrefix):])
+
+ // Sanitize key to prevent collisions and ensure Azure compliance
+ key = sanitizeMetadataKey(key)
+
+ val := string(v)
+ metadata[key] = &val
}
}
- parsed_metadata := make(map[string]string)
- for k, v := range metadata {
- parsed_metadata[strings.Replace(k, "-", "_", -1)] = v
- }
- return parsed_metadata
+ return metadata
}
func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStorageLocation, oldEntry *filer_pb.Entry, newEntry *filer_pb.Entry) (err error) {
@@ -201,54 +272,68 @@ func (az *azureRemoteStorageClient) UpdateFileMetadata(loc *remote_pb.RemoteStor
metadata := toMetadata(newEntry.Extended)
key := loc.Path[1:]
- containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
+ blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlobClient(key)
- _, err = containerURL.NewBlobURL(key).SetMetadata(context.Background(), metadata, azblob.BlobAccessConditions{}, azblob.ClientProvidedKeyOptions{})
+ _, err = blobClient.SetMetadata(context.Background(), metadata, nil)
return
}
func (az *azureRemoteStorageClient) DeleteFile(loc *remote_pb.RemoteStorageLocation) (err error) {
key := loc.Path[1:]
- containerURL := az.serviceURL.NewContainerURL(loc.Bucket)
- if _, err = containerURL.NewBlobURL(key).Delete(context.Background(),
- azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
- return fmt.Errorf("azure delete %s%s: %v", loc.Bucket, loc.Path, err)
+ blobClient := az.client.ServiceClient().NewContainerClient(loc.Bucket).NewBlobClient(key)
+
+ _, err = blobClient.Delete(context.Background(), &blob.DeleteOptions{
+ DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
+ })
+ if err != nil {
+ // Make delete idempotent - don't return error if blob doesn't exist
+ if bloberror.HasCode(err, bloberror.BlobNotFound) {
+ return nil
+ }
+ return fmt.Errorf("azure delete %s%s: %w", loc.Bucket, loc.Path, err)
}
return
}
func (az *azureRemoteStorageClient) ListBuckets() (buckets []*remote_storage.Bucket, err error) {
- ctx := context.Background()
- for containerMarker := (azblob.Marker{}); containerMarker.NotDone(); {
- listContainer, err := az.serviceURL.ListContainersSegment(ctx, containerMarker, azblob.ListContainersSegmentOptions{})
- if err == nil {
- for _, v := range listContainer.ContainerItems {
- buckets = append(buckets, &remote_storage.Bucket{
- Name: v.Name,
- CreatedAt: v.Properties.LastModified,
- })
- }
- } else {
+ pager := az.client.NewListContainersPager(nil)
+
+ for pager.More() {
+ resp, err := pager.NextPage(context.Background())
+ if err != nil {
return buckets, err
}
- containerMarker = listContainer.NextMarker
+
+ for _, containerItem := range resp.ContainerItems {
+ if containerItem.Name != nil {
+ bucket := &remote_storage.Bucket{
+ Name: *containerItem.Name,
+ }
+ if containerItem.Properties != nil && containerItem.Properties.LastModified != nil {
+ bucket.CreatedAt = *containerItem.Properties.LastModified
+ }
+ buckets = append(buckets, bucket)
+ }
+ }
}
return
}
func (az *azureRemoteStorageClient) CreateBucket(name string) (err error) {
- containerURL := az.serviceURL.NewContainerURL(name)
- if _, err = containerURL.Create(context.Background(), azblob.Metadata{}, azblob.PublicAccessNone); err != nil {
- return fmt.Errorf("create bucket %s: %v", name, err)
+ containerClient := az.client.ServiceClient().NewContainerClient(name)
+ _, err = containerClient.Create(context.Background(), nil)
+ if err != nil {
+ return fmt.Errorf("create bucket %s: %w", name, err)
}
return
}
func (az *azureRemoteStorageClient) DeleteBucket(name string) (err error) {
- containerURL := az.serviceURL.NewContainerURL(name)
- if _, err = containerURL.Delete(context.Background(), azblob.ContainerAccessConditions{}); err != nil {
- return fmt.Errorf("delete bucket %s: %v", name, err)
+ containerClient := az.client.ServiceClient().NewContainerClient(name)
+ _, err = containerClient.Delete(context.Background(), nil)
+ if err != nil {
+ return fmt.Errorf("delete bucket %s: %w", name, err)
}
return
}
diff --git a/weed/remote_storage/azure/azure_storage_client_test.go b/weed/remote_storage/azure/azure_storage_client_test.go
new file mode 100644
index 000000000..acb7dbd17
--- /dev/null
+++ b/weed/remote_storage/azure/azure_storage_client_test.go
@@ -0,0 +1,377 @@
+package azure
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "testing"
+ "time"
+
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/pb/remote_pb"
+ "github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
+)
+
+// TestAzureStorageClientBasic tests basic Azure storage client operations
+func TestAzureStorageClientBasic(t *testing.T) {
+ // Skip if credentials not available
+ accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
+ accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
+ testContainer := os.Getenv("AZURE_TEST_CONTAINER")
+
+ if accountName == "" || accountKey == "" {
+ t.Skip("Skipping Azure storage test: AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set")
+ }
+ if testContainer == "" {
+ testContainer = "seaweedfs-test"
+ }
+
+ // Create client
+ maker := azureRemoteStorageMaker{}
+ conf := &remote_pb.RemoteConf{
+ Name: "test-azure",
+ AzureAccountName: accountName,
+ AzureAccountKey: accountKey,
+ }
+
+ client, err := maker.Make(conf)
+ if err != nil {
+ t.Fatalf("Failed to create Azure client: %v", err)
+ }
+
+ azClient := client.(*azureRemoteStorageClient)
+
+ // Test 1: Create bucket/container
+ t.Run("CreateBucket", func(t *testing.T) {
+ err := azClient.CreateBucket(testContainer)
+ // Ignore error if bucket already exists
+ if err != nil && !bloberror.HasCode(err, bloberror.ContainerAlreadyExists) {
+ t.Fatalf("Failed to create bucket: %v", err)
+ }
+ })
+
+ // Test 2: List buckets
+ t.Run("ListBuckets", func(t *testing.T) {
+ buckets, err := azClient.ListBuckets()
+ if err != nil {
+ t.Fatalf("Failed to list buckets: %v", err)
+ }
+ if len(buckets) == 0 {
+ t.Log("No buckets found (might be expected)")
+ } else {
+ t.Logf("Found %d buckets", len(buckets))
+ }
+ })
+
+ // Test 3: Write file
+ testContent := []byte("Hello from SeaweedFS Azure SDK migration test!")
+ testKey := fmt.Sprintf("/test-file-%d.txt", time.Now().Unix())
+ loc := &remote_pb.RemoteStorageLocation{
+ Name: "test-azure",
+ Bucket: testContainer,
+ Path: testKey,
+ }
+
+ t.Run("WriteFile", func(t *testing.T) {
+ entry := &filer_pb.Entry{
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ Mime: "text/plain",
+ },
+ Extended: map[string][]byte{
+ "x-amz-meta-test-key": []byte("test-value"),
+ },
+ }
+
+ reader := bytes.NewReader(testContent)
+ remoteEntry, err := azClient.WriteFile(loc, entry, reader)
+ if err != nil {
+ t.Fatalf("Failed to write file: %v", err)
+ }
+ if remoteEntry == nil {
+ t.Fatal("Remote entry is nil")
+ }
+ if remoteEntry.RemoteSize != int64(len(testContent)) {
+ t.Errorf("Expected size %d, got %d", len(testContent), remoteEntry.RemoteSize)
+ }
+ })
+
+ // Test 4: Read file
+ t.Run("ReadFile", func(t *testing.T) {
+ data, err := azClient.ReadFile(loc, 0, int64(len(testContent)))
+ if err != nil {
+ t.Fatalf("Failed to read file: %v", err)
+ }
+ if !bytes.Equal(data, testContent) {
+ t.Errorf("Content mismatch. Expected: %s, Got: %s", testContent, data)
+ }
+ })
+
+ // Test 5: Read partial file
+ t.Run("ReadPartialFile", func(t *testing.T) {
+ data, err := azClient.ReadFile(loc, 0, 5)
+ if err != nil {
+ t.Fatalf("Failed to read partial file: %v", err)
+ }
+ expected := testContent[:5]
+ if !bytes.Equal(data, expected) {
+ t.Errorf("Content mismatch. Expected: %s, Got: %s", expected, data)
+ }
+ })
+
+ // Test 6: Update metadata
+ t.Run("UpdateMetadata", func(t *testing.T) {
+ oldEntry := &filer_pb.Entry{
+ Extended: map[string][]byte{
+ "x-amz-meta-test-key": []byte("test-value"),
+ },
+ }
+ newEntry := &filer_pb.Entry{
+ Extended: map[string][]byte{
+ "x-amz-meta-test-key": []byte("test-value"),
+ "x-amz-meta-new-key": []byte("new-value"),
+ },
+ }
+ err := azClient.UpdateFileMetadata(loc, oldEntry, newEntry)
+ if err != nil {
+ t.Fatalf("Failed to update metadata: %v", err)
+ }
+ })
+
+ // Test 7: Traverse (list objects)
+ t.Run("Traverse", func(t *testing.T) {
+ foundFile := false
+ err := azClient.Traverse(loc, func(dir string, name string, isDir bool, remoteEntry *filer_pb.RemoteEntry) error {
+ if !isDir && name == testKey[1:] { // Remove leading slash
+ foundFile = true
+ }
+ return nil
+ })
+ if err != nil {
+ t.Fatalf("Failed to traverse: %v", err)
+ }
+ if !foundFile {
+ t.Log("Test file not found in traverse (might be expected due to path matching)")
+ }
+ })
+
+ // Test 8: Delete file
+ t.Run("DeleteFile", func(t *testing.T) {
+ err := azClient.DeleteFile(loc)
+ if err != nil {
+ t.Fatalf("Failed to delete file: %v", err)
+ }
+ })
+
+ // Test 9: Verify file deleted (should fail)
+ t.Run("VerifyDeleted", func(t *testing.T) {
+ _, err := azClient.ReadFile(loc, 0, 10)
+ if !bloberror.HasCode(err, bloberror.BlobNotFound) {
+ t.Errorf("Expected BlobNotFound error, but got: %v", err)
+ }
+ })
+
+ // Clean up: Try to delete the test container
+ // Comment out if you want to keep the container
+ /*
+ t.Run("DeleteBucket", func(t *testing.T) {
+ err := azClient.DeleteBucket(testContainer)
+ if err != nil {
+ t.Logf("Warning: Failed to delete bucket: %v", err)
+ }
+ })
+ */
+}
+
+// TestToMetadata tests the metadata conversion function
+func TestToMetadata(t *testing.T) {
+ tests := []struct {
+ name string
+ input map[string][]byte
+ expected map[string]*string
+ }{
+ {
+ name: "basic metadata",
+ input: map[string][]byte{
+ s3_constants.AmzUserMetaPrefix + "key1": []byte("value1"),
+ s3_constants.AmzUserMetaPrefix + "key2": []byte("value2"),
+ },
+ expected: map[string]*string{
+ "key1": stringPtr("value1"),
+ "key2": stringPtr("value2"),
+ },
+ },
+ {
+ name: "metadata with dashes",
+ input: map[string][]byte{
+ s3_constants.AmzUserMetaPrefix + "content-type": []byte("text/plain"),
+ },
+ expected: map[string]*string{
+ "content_2d_type": stringPtr("text/plain"), // dash (0x2d) -> _2d_
+ },
+ },
+ {
+ name: "non-metadata keys ignored",
+ input: map[string][]byte{
+ "some-other-key": []byte("ignored"),
+ s3_constants.AmzUserMetaPrefix + "included": []byte("included"),
+ },
+ expected: map[string]*string{
+ "included": stringPtr("included"),
+ },
+ },
+ {
+ name: "keys starting with digits",
+ input: map[string][]byte{
+ s3_constants.AmzUserMetaPrefix + "123key": []byte("value1"),
+ s3_constants.AmzUserMetaPrefix + "456-test": []byte("value2"),
+ s3_constants.AmzUserMetaPrefix + "789": []byte("value3"),
+ },
+ expected: map[string]*string{
+ "_123key": stringPtr("value1"), // starts with digit -> prefix _
+ "_456_2d_test": stringPtr("value2"), // starts with digit AND has dash
+ "_789": stringPtr("value3"),
+ },
+ },
+ {
+ name: "uppercase and mixed case keys",
+ input: map[string][]byte{
+ s3_constants.AmzUserMetaPrefix + "My-Key": []byte("value1"),
+ s3_constants.AmzUserMetaPrefix + "UPPERCASE": []byte("value2"),
+ s3_constants.AmzUserMetaPrefix + "MiXeD-CaSe": []byte("value3"),
+ },
+ expected: map[string]*string{
+ "my_2d_key": stringPtr("value1"), // lowercase + dash -> _2d_
+ "uppercase": stringPtr("value2"),
+ "mixed_2d_case": stringPtr("value3"),
+ },
+ },
+ {
+ name: "keys with invalid characters",
+ input: map[string][]byte{
+ s3_constants.AmzUserMetaPrefix + "my.key": []byte("value1"),
+ s3_constants.AmzUserMetaPrefix + "key+plus": []byte("value2"),
+ s3_constants.AmzUserMetaPrefix + "key@symbol": []byte("value3"),
+ s3_constants.AmzUserMetaPrefix + "key-with.": []byte("value4"),
+ s3_constants.AmzUserMetaPrefix + "key/slash": []byte("value5"),
+ },
+ expected: map[string]*string{
+ "my_2e_key": stringPtr("value1"), // dot (0x2e) -> _2e_
+ "key_2b_plus": stringPtr("value2"), // plus (0x2b) -> _2b_
+ "key_40_symbol": stringPtr("value3"), // @ (0x40) -> _40_
+ "key_2d_with_2e_": stringPtr("value4"), // dash and dot
+ "key_2f_slash": stringPtr("value5"), // slash (0x2f) -> _2f_
+ },
+ },
+ {
+ name: "collision prevention",
+ input: map[string][]byte{
+ s3_constants.AmzUserMetaPrefix + "my-key": []byte("value1"),
+ s3_constants.AmzUserMetaPrefix + "my.key": []byte("value2"),
+ s3_constants.AmzUserMetaPrefix + "my_key": []byte("value3"),
+ },
+ expected: map[string]*string{
+ "my_2d_key": stringPtr("value1"), // dash (0x2d)
+ "my_2e_key": stringPtr("value2"), // dot (0x2e)
+ "my_key": stringPtr("value3"), // underscore is valid, no encoding
+ },
+ },
+ {
+ name: "empty input",
+ input: map[string][]byte{},
+ expected: map[string]*string{},
+ },
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ result := toMetadata(tt.input)
+ if len(result) != len(tt.expected) {
+ t.Errorf("Expected %d keys, got %d", len(tt.expected), len(result))
+ }
+ for key, expectedVal := range tt.expected {
+ if resultVal, ok := result[key]; !ok {
+ t.Errorf("Expected key %s not found", key)
+ } else if resultVal == nil || expectedVal == nil {
+ if resultVal != expectedVal {
+ t.Errorf("For key %s: expected %v, got %v", key, expectedVal, resultVal)
+ }
+ } else if *resultVal != *expectedVal {
+ t.Errorf("For key %s: expected %s, got %s", key, *expectedVal, *resultVal)
+ }
+ }
+ })
+ }
+}
+
+func contains(s, substr string) bool {
+ return bytes.Contains([]byte(s), []byte(substr))
+}
+
+func stringPtr(s string) *string {
+ return &s
+}
+
+// Benchmark tests
+func BenchmarkToMetadata(b *testing.B) {
+ input := map[string][]byte{
+ "x-amz-meta-key1": []byte("value1"),
+ "x-amz-meta-key2": []byte("value2"),
+ "x-amz-meta-content-type": []byte("text/plain"),
+ "other-key": []byte("ignored"),
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ toMetadata(input)
+ }
+}
+
+// Test that the maker implements the interface
+func TestAzureRemoteStorageMaker(t *testing.T) {
+ maker := azureRemoteStorageMaker{}
+
+ if !maker.HasBucket() {
+ t.Error("Expected HasBucket() to return true")
+ }
+
+ // Test with missing credentials
+ conf := &remote_pb.RemoteConf{
+ Name: "test",
+ }
+ _, err := maker.Make(conf)
+ if err == nil {
+ t.Error("Expected error with missing credentials")
+ }
+}
+
+// Test error cases
+func TestAzureStorageClientErrors(t *testing.T) {
+ // Test with invalid credentials
+ maker := azureRemoteStorageMaker{}
+ conf := &remote_pb.RemoteConf{
+ Name: "test",
+ AzureAccountName: "invalid",
+ AzureAccountKey: "aW52YWxpZGtleQ==", // base64 encoded "invalidkey"
+ }
+
+ client, err := maker.Make(conf)
+ if err != nil {
+ t.Skip("Invalid credentials correctly rejected at client creation")
+ }
+
+ // If client creation succeeded, operations should fail
+ azClient := client.(*azureRemoteStorageClient)
+ loc := &remote_pb.RemoteStorageLocation{
+ Name: "test",
+ Bucket: "nonexistent",
+ Path: "/test.txt",
+ }
+
+ // These operations should fail with invalid credentials
+ _, err = azClient.ReadFile(loc, 0, 10)
+ if err == nil {
+ t.Log("Expected error with invalid credentials on ReadFile, but got none (might be cached)")
+ }
+}
diff --git a/weed/replication/sink/azuresink/azure_sink.go b/weed/replication/sink/azuresink/azure_sink.go
index fb28355bc..b0e40e1a7 100644
--- a/weed/replication/sink/azuresink/azure_sink.go
+++ b/weed/replication/sink/azuresink/azure_sink.go
@@ -3,24 +3,31 @@ package azuresink
import (
"bytes"
"context"
+ "errors"
"fmt"
- "github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
"net/http"
- "net/url"
"strings"
"time"
- "github.com/Azure/azure-storage-blob-go/azblob"
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore"
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore/policy"
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore/streaming"
+ "github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/appendblob"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/blob"
+ "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/bloberror"
"github.com/seaweedfs/seaweedfs/weed/filer"
"github.com/seaweedfs/seaweedfs/weed/glog"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+ "github.com/seaweedfs/seaweedfs/weed/replication/repl_util"
"github.com/seaweedfs/seaweedfs/weed/replication/sink"
"github.com/seaweedfs/seaweedfs/weed/replication/source"
"github.com/seaweedfs/seaweedfs/weed/util"
)
type AzureSink struct {
- containerURL azblob.ContainerURL
+ client *azblob.Client
container string
dir string
filerSource *source.FilerSource
@@ -61,20 +68,28 @@ func (g *AzureSink) initialize(accountName, accountKey, container, dir string) e
g.container = container
g.dir = dir
- // Use your Storage account's name and key to create a credential object.
+ // Create credential and client
credential, err := azblob.NewSharedKeyCredential(accountName, accountKey)
if err != nil {
- glog.Fatalf("failed to create Azure credential with account name:%s: %v", accountName, err)
+ return fmt.Errorf("failed to create Azure credential with account name:%s: %w", accountName, err)
}
- // Create a request pipeline that is used to process HTTP(S) requests and responses.
- p := azblob.NewPipeline(credential, azblob.PipelineOptions{})
-
- // Create an ServiceURL object that wraps the service URL and a request pipeline.
- u, _ := url.Parse(fmt.Sprintf("https://%s.blob.core.windows.net", accountName))
- serviceURL := azblob.NewServiceURL(*u, p)
+ serviceURL := fmt.Sprintf("https://%s.blob.core.windows.net/", accountName)
+ client, err := azblob.NewClientWithSharedKeyCredential(serviceURL, credential, &azblob.ClientOptions{
+ ClientOptions: azcore.ClientOptions{
+ Retry: policy.RetryOptions{
+ MaxRetries: 10, // Increased from default 3 for replication sink resiliency
+ TryTimeout: time.Minute,
+ RetryDelay: 2 * time.Second,
+ MaxRetryDelay: time.Minute,
+ },
+ },
+ })
+ if err != nil {
+ return fmt.Errorf("failed to create Azure client: %w", err)
+ }
- g.containerURL = serviceURL.NewContainerURL(g.container)
+ g.client = client
return nil
}
@@ -87,13 +102,19 @@ func (g *AzureSink) DeleteEntry(key string, isDirectory, deleteIncludeChunks boo
key = key + "/"
}
- if _, err := g.containerURL.NewBlobURL(key).Delete(context.Background(),
- azblob.DeleteSnapshotsOptionInclude, azblob.BlobAccessConditions{}); err != nil {
- return fmt.Errorf("azure delete %s/%s: %v", g.container, key, err)
+ blobClient := g.client.ServiceClient().NewContainerClient(g.container).NewBlobClient(key)
+ _, err := blobClient.Delete(context.Background(), &blob.DeleteOptions{
+ DeleteSnapshots: to.Ptr(blob.DeleteSnapshotsOptionTypeInclude),
+ })
+ if err != nil {
+ // Make delete idempotent - don't return error if blob doesn't exist
+ if bloberror.HasCode(err, bloberror.BlobNotFound) {
+ return nil
+ }
+ return fmt.Errorf("azure delete %s/%s: %w", g.container, key, err)
}
return nil
-
}
func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []int32) error {
@@ -107,26 +128,38 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
totalSize := filer.FileSize(entry)
chunkViews := filer.ViewFromChunks(context.Background(), g.filerSource.LookupFileId, entry.GetChunks(), 0, int64(totalSize))
- // Create a URL that references a to-be-created blob in your
- // Azure Storage account's container.
- appendBlobURL := g.containerURL.NewAppendBlobURL(key)
+ // Create append blob client
+ appendBlobClient := g.client.ServiceClient().NewContainerClient(g.container).NewAppendBlobClient(key)
- accessCondition := azblob.BlobAccessConditions{}
+ // Create blob with access conditions
+ accessConditions := &blob.AccessConditions{}
if entry.Attributes != nil && entry.Attributes.Mtime > 0 {
- accessCondition.ModifiedAccessConditions.IfUnmodifiedSince = time.Unix(entry.Attributes.Mtime, 0)
+ modifiedTime := time.Unix(entry.Attributes.Mtime, 0)
+ accessConditions.ModifiedAccessConditions = &blob.ModifiedAccessConditions{
+ IfUnmodifiedSince: &modifiedTime,
+ }
}
- res, err := appendBlobURL.Create(context.Background(), azblob.BlobHTTPHeaders{}, azblob.Metadata{}, accessCondition, azblob.BlobTagsMap{}, azblob.ClientProvidedKeyOptions{}, azblob.ImmutabilityPolicyOptions{})
- if res != nil && res.StatusCode() == http.StatusPreconditionFailed {
- glog.V(0).Infof("skip overwriting %s/%s: %v", g.container, key, err)
- return nil
- }
+ _, err := appendBlobClient.Create(context.Background(), &appendblob.CreateOptions{
+ AccessConditions: accessConditions,
+ })
+
if err != nil {
- return err
+ if bloberror.HasCode(err, bloberror.BlobAlreadyExists) {
+ // Blob already exists, which is fine for an append blob - we can append to it
+ } else {
+ // Check if this is a precondition failed error (HTTP 412)
+ var respErr *azcore.ResponseError
+ if ok := errors.As(err, &respErr); ok && respErr.StatusCode == http.StatusPreconditionFailed {
+ glog.V(0).Infof("skip overwriting %s/%s: precondition failed", g.container, key)
+ return nil
+ }
+ return fmt.Errorf("azure create append blob %s/%s: %w", g.container, key, err)
+ }
}
writeFunc := func(data []byte) error {
- _, writeErr := appendBlobURL.AppendBlock(context.Background(), bytes.NewReader(data), azblob.AppendBlobAccessConditions{}, nil, azblob.ClientProvidedKeyOptions{})
+ _, writeErr := appendBlobClient.AppendBlock(context.Background(), streaming.NopCloser(bytes.NewReader(data)), &appendblob.AppendBlockOptions{})
return writeErr
}
@@ -139,7 +172,6 @@ func (g *AzureSink) CreateEntry(key string, entry *filer_pb.Entry, signatures []
}
return nil
-
}
func (g *AzureSink) UpdateEntry(key string, oldEntry *filer_pb.Entry, newParentPath string, newEntry *filer_pb.Entry, deleteIncludeChunks bool, signatures []int32) (foundExistingEntry bool, err error) {
diff --git a/weed/replication/sink/azuresink/azure_sink_test.go b/weed/replication/sink/azuresink/azure_sink_test.go
new file mode 100644
index 000000000..e139086e6
--- /dev/null
+++ b/weed/replication/sink/azuresink/azure_sink_test.go
@@ -0,0 +1,355 @@
+package azuresink
+
+import (
+ "os"
+ "testing"
+ "time"
+
+ "github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
+)
+
+// MockConfiguration for testing
+type mockConfiguration struct {
+ values map[string]interface{}
+}
+
+func newMockConfiguration() *mockConfiguration {
+ return &mockConfiguration{
+ values: make(map[string]interface{}),
+ }
+}
+
+func (m *mockConfiguration) GetString(key string) string {
+ if v, ok := m.values[key]; ok {
+ return v.(string)
+ }
+ return ""
+}
+
+func (m *mockConfiguration) GetBool(key string) bool {
+ if v, ok := m.values[key]; ok {
+ return v.(bool)
+ }
+ return false
+}
+
+func (m *mockConfiguration) GetInt(key string) int {
+ if v, ok := m.values[key]; ok {
+ return v.(int)
+ }
+ return 0
+}
+
+func (m *mockConfiguration) GetInt64(key string) int64 {
+ if v, ok := m.values[key]; ok {
+ return v.(int64)
+ }
+ return 0
+}
+
+func (m *mockConfiguration) GetFloat64(key string) float64 {
+ if v, ok := m.values[key]; ok {
+ return v.(float64)
+ }
+ return 0.0
+}
+
+func (m *mockConfiguration) GetStringSlice(key string) []string {
+ if v, ok := m.values[key]; ok {
+ return v.([]string)
+ }
+ return nil
+}
+
+func (m *mockConfiguration) SetDefault(key string, value interface{}) {
+ if _, exists := m.values[key]; !exists {
+ m.values[key] = value
+ }
+}
+
+// Test the AzureSink interface implementation
+func TestAzureSinkInterface(t *testing.T) {
+ sink := &AzureSink{}
+
+ if sink.GetName() != "azure" {
+ t.Errorf("Expected name 'azure', got '%s'", sink.GetName())
+ }
+
+ // Test directory setting
+ sink.dir = "/test/dir"
+ if sink.GetSinkToDirectory() != "/test/dir" {
+ t.Errorf("Expected directory '/test/dir', got '%s'", sink.GetSinkToDirectory())
+ }
+
+ // Test incremental setting
+ sink.isIncremental = true
+ if !sink.IsIncremental() {
+ t.Error("Expected isIncremental to be true")
+ }
+}
+
+// Test Azure sink initialization
+func TestAzureSinkInitialization(t *testing.T) {
+ accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
+ accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
+ testContainer := os.Getenv("AZURE_TEST_CONTAINER")
+
+ if accountName == "" || accountKey == "" {
+ t.Skip("Skipping Azure sink test: AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set")
+ }
+ if testContainer == "" {
+ testContainer = "seaweedfs-test"
+ }
+
+ sink := &AzureSink{}
+
+ err := sink.initialize(accountName, accountKey, testContainer, "/test")
+ if err != nil {
+ t.Fatalf("Failed to initialize Azure sink: %v", err)
+ }
+
+ if sink.container != testContainer {
+ t.Errorf("Expected container '%s', got '%s'", testContainer, sink.container)
+ }
+
+ if sink.dir != "/test" {
+ t.Errorf("Expected dir '/test', got '%s'", sink.dir)
+ }
+
+ if sink.client == nil {
+ t.Error("Expected client to be initialized")
+ }
+}
+
+// Test configuration-based initialization
+func TestAzureSinkInitializeFromConfig(t *testing.T) {
+ accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
+ accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
+ testContainer := os.Getenv("AZURE_TEST_CONTAINER")
+
+ if accountName == "" || accountKey == "" {
+ t.Skip("Skipping Azure sink config test: AZURE_STORAGE_ACCOUNT or AZURE_STORAGE_ACCESS_KEY not set")
+ }
+ if testContainer == "" {
+ testContainer = "seaweedfs-test"
+ }
+
+ config := newMockConfiguration()
+ config.values["azure.account_name"] = accountName
+ config.values["azure.account_key"] = accountKey
+ config.values["azure.container"] = testContainer
+ config.values["azure.directory"] = "/test"
+ config.values["azure.is_incremental"] = true
+
+ sink := &AzureSink{}
+ err := sink.Initialize(config, "azure.")
+ if err != nil {
+ t.Fatalf("Failed to initialize from config: %v", err)
+ }
+
+ if !sink.IsIncremental() {
+ t.Error("Expected incremental to be true")
+ }
+}
+
+// Test cleanKey function
+func TestCleanKey(t *testing.T) {
+ tests := []struct {
+ input string
+ expected string
+ }{
+ {"/test/file.txt", "test/file.txt"},
+ {"test/file.txt", "test/file.txt"},
+ {"/", ""},
+ {"", ""},
+ {"/a/b/c", "a/b/c"},
+ }
+
+ for _, tt := range tests {
+ t.Run(tt.input, func(t *testing.T) {
+ result := cleanKey(tt.input)
+ if result != tt.expected {
+ t.Errorf("cleanKey(%q) = %q, want %q", tt.input, result, tt.expected)
+ }
+ })
+ }
+}
+
+// Test entry operations (requires valid credentials)
+func TestAzureSinkEntryOperations(t *testing.T) {
+ accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
+ accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
+ testContainer := os.Getenv("AZURE_TEST_CONTAINER")
+
+ if accountName == "" || accountKey == "" {
+ t.Skip("Skipping Azure sink entry test: credentials not set")
+ }
+ if testContainer == "" {
+ testContainer = "seaweedfs-test"
+ }
+
+ sink := &AzureSink{}
+ err := sink.initialize(accountName, accountKey, testContainer, "/test")
+ if err != nil {
+ t.Fatalf("Failed to initialize: %v", err)
+ }
+
+ // Test CreateEntry with directory (should be no-op)
+ t.Run("CreateDirectory", func(t *testing.T) {
+ entry := &filer_pb.Entry{
+ IsDirectory: true,
+ }
+ err := sink.CreateEntry("/test/dir", entry, nil)
+ if err != nil {
+ t.Errorf("CreateEntry for directory should not error: %v", err)
+ }
+ })
+
+ // Test CreateEntry with file
+ testKey := "/test-sink-file-" + time.Now().Format("20060102-150405") + ".txt"
+ t.Run("CreateFile", func(t *testing.T) {
+ entry := &filer_pb.Entry{
+ IsDirectory: false,
+ Content: []byte("Test content for Azure sink"),
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ },
+ }
+ err := sink.CreateEntry(testKey, entry, nil)
+ if err != nil {
+ t.Fatalf("Failed to create entry: %v", err)
+ }
+ })
+
+ // Test UpdateEntry
+ t.Run("UpdateEntry", func(t *testing.T) {
+ oldEntry := &filer_pb.Entry{
+ Content: []byte("Old content"),
+ }
+ newEntry := &filer_pb.Entry{
+ Content: []byte("New content for update test"),
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ },
+ }
+ found, err := sink.UpdateEntry(testKey, oldEntry, "/test", newEntry, false, nil)
+ if err != nil {
+ t.Fatalf("Failed to update entry: %v", err)
+ }
+ if !found {
+ t.Error("Expected found to be true")
+ }
+ })
+
+ // Test DeleteEntry
+ t.Run("DeleteFile", func(t *testing.T) {
+ err := sink.DeleteEntry(testKey, false, false, nil)
+ if err != nil {
+ t.Fatalf("Failed to delete entry: %v", err)
+ }
+ })
+
+ // Test DeleteEntry with directory marker
+ testDirKey := "/test-dir-" + time.Now().Format("20060102-150405")
+ t.Run("DeleteDirectory", func(t *testing.T) {
+ // First create a directory marker
+ entry := &filer_pb.Entry{
+ IsDirectory: false,
+ Content: []byte(""),
+ }
+ err := sink.CreateEntry(testDirKey+"/", entry, nil)
+ if err != nil {
+ t.Logf("Warning: Failed to create directory marker: %v", err)
+ }
+
+ // Then delete it
+ err = sink.DeleteEntry(testDirKey, true, false, nil)
+ if err != nil {
+ t.Logf("Warning: Failed to delete directory: %v", err)
+ }
+ })
+}
+
+// Test CreateEntry with precondition (IfUnmodifiedSince)
+func TestAzureSinkPrecondition(t *testing.T) {
+ accountName := os.Getenv("AZURE_STORAGE_ACCOUNT")
+ accountKey := os.Getenv("AZURE_STORAGE_ACCESS_KEY")
+ testContainer := os.Getenv("AZURE_TEST_CONTAINER")
+
+ if accountName == "" || accountKey == "" {
+ t.Skip("Skipping Azure sink precondition test: credentials not set")
+ }
+ if testContainer == "" {
+ testContainer = "seaweedfs-test"
+ }
+
+ sink := &AzureSink{}
+ err := sink.initialize(accountName, accountKey, testContainer, "/test")
+ if err != nil {
+ t.Fatalf("Failed to initialize: %v", err)
+ }
+
+ testKey := "/test-precondition-" + time.Now().Format("20060102-150405") + ".txt"
+
+ // Create initial entry
+ entry := &filer_pb.Entry{
+ Content: []byte("Initial content"),
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Unix(),
+ },
+ }
+ err = sink.CreateEntry(testKey, entry, nil)
+ if err != nil {
+ t.Fatalf("Failed to create initial entry: %v", err)
+ }
+
+ // Try to create again with old mtime (should be skipped due to precondition)
+ oldEntry := &filer_pb.Entry{
+ Content: []byte("Should not overwrite"),
+ Attributes: &filer_pb.FuseAttributes{
+ Mtime: time.Now().Add(-1 * time.Hour).Unix(), // Old timestamp
+ },
+ }
+ err = sink.CreateEntry(testKey, oldEntry, nil)
+ // Should either succeed (skip) or fail with precondition error
+ if err != nil {
+ t.Logf("Create with old mtime: %v (expected)", err)
+ }
+
+ // Clean up
+ sink.DeleteEntry(testKey, false, false, nil)
+}
+
+// Benchmark tests
+func BenchmarkCleanKey(b *testing.B) {
+ keys := []string{
+ "/simple/path.txt",
+ "no/leading/slash.txt",
+ "/",
+ "/complex/path/with/many/segments/file.txt",
+ }
+
+ b.ResetTimer()
+ for i := 0; i < b.N; i++ {
+ cleanKey(keys[i%len(keys)])
+ }
+}
+
+// Test error handling with invalid credentials
+func TestAzureSinkInvalidCredentials(t *testing.T) {
+ sink := &AzureSink{}
+
+ err := sink.initialize("invalid-account", "aW52YWxpZGtleQ==", "test-container", "/test")
+ if err != nil {
+ t.Skip("Invalid credentials correctly rejected at initialization")
+ }
+
+ // If initialization succeeded, operations should fail
+ entry := &filer_pb.Entry{
+ Content: []byte("test"),
+ }
+ err = sink.CreateEntry("/test.txt", entry, nil)
+ if err == nil {
+ t.Log("Expected error with invalid credentials, but got none (might be cached)")
+ }
+}
diff --git a/weed/s3api/auth_credentials.go b/weed/s3api/auth_credentials.go
index 1f147e884..e3e7c0bbb 100644
--- a/weed/s3api/auth_credentials.go
+++ b/weed/s3api/auth_credentials.go
@@ -56,10 +56,10 @@ type IdentityAccessManagement struct {
}
type Identity struct {
- Name string
- Account *Account
- Credentials []*Credential
- Actions []Action
+ Name string
+ Account *Account
+ Credentials []*Credential
+ Actions []Action
PrincipalArn string // ARN for IAM authorization (e.g., "arn:seaweed:iam::user/username")
}
diff --git a/weed/s3api/filer_multipart.go b/weed/s3api/filer_multipart.go
index c6de70738..d63e10364 100644
--- a/weed/s3api/filer_multipart.go
+++ b/weed/s3api/filer_multipart.go
@@ -294,7 +294,7 @@ func (s3a *S3ApiServer) completeMultipartUpload(r *http.Request, input *s3.Compl
ETag: chunk.ETag,
IsCompressed: chunk.IsCompressed,
// Preserve SSE metadata with updated within-part offset
- SseType: chunk.SseType,
+ SseType: chunk.SseType,
SseMetadata: sseKmsMetadata,
}
finalParts = append(finalParts, p)
diff --git a/weed/s3api/policy_engine/types.go b/weed/s3api/policy_engine/types.go
index 5f417afb4..d68b1f297 100644
--- a/weed/s3api/policy_engine/types.go
+++ b/weed/s3api/policy_engine/types.go
@@ -407,8 +407,6 @@ func (cs *CompiledStatement) EvaluateStatement(args *PolicyEvaluationArgs) bool
return false
}
-
-
return true
}
diff --git a/weed/s3api/s3_list_parts_action_test.go b/weed/s3api/s3_list_parts_action_test.go
index 4c0a28eff..c0e9aa8a1 100644
--- a/weed/s3api/s3_list_parts_action_test.go
+++ b/weed/s3api/s3_list_parts_action_test.go
@@ -35,7 +35,7 @@ func TestListPartsActionMapping(t *testing.T) {
{
name: "get_object_with_uploadId",
method: "GET",
- bucket: "test-bucket",
+ bucket: "test-bucket",
objectKey: "test-object.txt",
queryParams: map[string]string{"uploadId": "test-upload-id"},
fallbackAction: s3_constants.ACTION_READ,
@@ -43,14 +43,14 @@ func TestListPartsActionMapping(t *testing.T) {
description: "GET request with uploadId should map to s3:ListParts (this was the missing mapping)",
},
{
- name: "get_object_with_uploadId_and_other_params",
- method: "GET",
- bucket: "test-bucket",
- objectKey: "test-object.txt",
+ name: "get_object_with_uploadId_and_other_params",
+ method: "GET",
+ bucket: "test-bucket",
+ objectKey: "test-object.txt",
queryParams: map[string]string{
- "uploadId": "test-upload-id-123",
- "max-parts": "100",
- "part-number-marker": "50",
+ "uploadId": "test-upload-id-123",
+ "max-parts": "100",
+ "part-number-marker": "50",
},
fallbackAction: s3_constants.ACTION_READ,
expectedAction: "s3:ListParts",
@@ -107,7 +107,7 @@ func TestListPartsActionMapping(t *testing.T) {
action := determineGranularS3Action(req, tc.fallbackAction, tc.bucket, tc.objectKey)
// Verify the action mapping
- assert.Equal(t, tc.expectedAction, action,
+ assert.Equal(t, tc.expectedAction, action,
"Test case: %s - %s", tc.name, tc.description)
})
}
@@ -145,17 +145,17 @@ func TestListPartsActionMappingSecurityScenarios(t *testing.T) {
t.Run("policy_enforcement_precision", func(t *testing.T) {
// This test documents the security improvement - before the fix, both operations
// would incorrectly map to s3:GetObject, preventing fine-grained access control
-
+
testCases := []struct {
description string
- queryParams map[string]string
+ queryParams map[string]string
expectedAction string
securityNote string
}{
{
description: "List multipart upload parts",
queryParams: map[string]string{"uploadId": "upload-abc123"},
- expectedAction: "s3:ListParts",
+ expectedAction: "s3:ListParts",
securityNote: "FIXED: Now correctly maps to s3:ListParts instead of s3:GetObject",
},
{
@@ -165,7 +165,7 @@ func TestListPartsActionMappingSecurityScenarios(t *testing.T) {
securityNote: "UNCHANGED: Still correctly maps to s3:GetObject",
},
{
- description: "Get object with complex upload ID",
+ description: "Get object with complex upload ID",
queryParams: map[string]string{"uploadId": "complex-upload-id-with-hyphens-123-abc-def"},
expectedAction: "s3:ListParts",
securityNote: "FIXED: Complex upload IDs now correctly detected",
@@ -185,8 +185,8 @@ func TestListPartsActionMappingSecurityScenarios(t *testing.T) {
req.URL.RawQuery = query.Encode()
action := determineGranularS3Action(req, s3_constants.ACTION_READ, "test-bucket", "test-object")
-
- assert.Equal(t, tc.expectedAction, action,
+
+ assert.Equal(t, tc.expectedAction, action,
"%s - %s", tc.description, tc.securityNote)
}
})
@@ -196,7 +196,7 @@ func TestListPartsActionMappingSecurityScenarios(t *testing.T) {
func TestListPartsActionRealWorldScenarios(t *testing.T) {
t.Run("large_file_upload_workflow", func(t *testing.T) {
// Simulate a large file upload workflow where users need different permissions for each step
-
+
// Step 1: Initiate multipart upload (POST with uploads query)
req1 := &http.Request{
Method: "POST",
@@ -206,7 +206,7 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) {
query1.Set("uploads", "")
req1.URL.RawQuery = query1.Encode()
action1 := determineGranularS3Action(req1, s3_constants.ACTION_WRITE, "data", "large-dataset.csv")
-
+
// Step 2: List existing parts (GET with uploadId query) - THIS WAS THE MISSING MAPPING
req2 := &http.Request{
Method: "GET",
@@ -216,7 +216,7 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) {
query2.Set("uploadId", "dataset-upload-20240827-001")
req2.URL.RawQuery = query2.Encode()
action2 := determineGranularS3Action(req2, s3_constants.ACTION_READ, "data", "large-dataset.csv")
-
+
// Step 3: Upload a part (PUT with uploadId and partNumber)
req3 := &http.Request{
Method: "PUT",
@@ -227,7 +227,7 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) {
query3.Set("partNumber", "5")
req3.URL.RawQuery = query3.Encode()
action3 := determineGranularS3Action(req3, s3_constants.ACTION_WRITE, "data", "large-dataset.csv")
-
+
// Step 4: Complete multipart upload (POST with uploadId)
req4 := &http.Request{
Method: "POST",
@@ -241,15 +241,15 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) {
// Verify each step has the correct action mapping
assert.Equal(t, "s3:CreateMultipartUpload", action1, "Step 1: Initiate upload")
assert.Equal(t, "s3:ListParts", action2, "Step 2: List parts (FIXED by this PR)")
- assert.Equal(t, "s3:UploadPart", action3, "Step 3: Upload part")
+ assert.Equal(t, "s3:UploadPart", action3, "Step 3: Upload part")
assert.Equal(t, "s3:CompleteMultipartUpload", action4, "Step 4: Complete upload")
-
+
// Verify that each step requires different permissions (security principle)
actions := []string{action1, action2, action3, action4}
for i, action := range actions {
for j, otherAction := range actions {
if i != j {
- assert.NotEqual(t, action, otherAction,
+ assert.NotEqual(t, action, otherAction,
"Each multipart operation step should require different permissions for fine-grained control")
}
}
@@ -258,7 +258,7 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) {
t.Run("edge_case_upload_ids", func(t *testing.T) {
// Test various upload ID formats to ensure the fix works with real AWS-compatible upload IDs
-
+
testUploadIds := []string{
"simple123",
"complex-upload-id-with-hyphens",
@@ -276,10 +276,10 @@ func TestListPartsActionRealWorldScenarios(t *testing.T) {
query := req.URL.Query()
query.Set("uploadId", uploadId)
req.URL.RawQuery = query.Encode()
-
+
action := determineGranularS3Action(req, s3_constants.ACTION_READ, "test-bucket", "test-file.bin")
-
- assert.Equal(t, "s3:ListParts", action,
+
+ assert.Equal(t, "s3:ListParts", action,
"Upload ID format %s should be correctly detected and mapped to s3:ListParts", uploadId)
}
})
diff --git a/weed/s3api/s3_sse_multipart_test.go b/weed/s3api/s3_sse_multipart_test.go
index 804e4ab4a..ba67a4c5c 100644
--- a/weed/s3api/s3_sse_multipart_test.go
+++ b/weed/s3api/s3_sse_multipart_test.go
@@ -6,7 +6,7 @@ import (
"io"
"strings"
"testing"
-
+
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
)
diff --git a/weed/s3api/s3api_object_handlers_put.go b/weed/s3api/s3api_object_handlers_put.go
index cc2fb3dfd..6a846120a 100644
--- a/weed/s3api/s3api_object_handlers_put.go
+++ b/weed/s3api/s3api_object_handlers_put.go
@@ -20,8 +20,8 @@ import (
"github.com/seaweedfs/seaweedfs/weed/s3api/s3err"
"github.com/seaweedfs/seaweedfs/weed/security"
weed_server "github.com/seaweedfs/seaweedfs/weed/server"
- "github.com/seaweedfs/seaweedfs/weed/util/constants"
stats_collect "github.com/seaweedfs/seaweedfs/weed/stats"
+ "github.com/seaweedfs/seaweedfs/weed/util/constants"
)
// Object lock validation errors
diff --git a/weed/server/filer_server_handlers_write.go b/weed/server/filer_server_handlers_write.go
index 1535ba207..4f1ca05be 100644
--- a/weed/server/filer_server_handlers_write.go
+++ b/weed/server/filer_server_handlers_write.go
@@ -15,10 +15,10 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/security"
- "github.com/seaweedfs/seaweedfs/weed/util/constants"
"github.com/seaweedfs/seaweedfs/weed/stats"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/constants"
util_http "github.com/seaweedfs/seaweedfs/weed/util/http"
)
diff --git a/weed/server/filer_server_handlers_write_autochunk.go b/weed/server/filer_server_handlers_write_autochunk.go
index 46fa2519d..a535ff16c 100644
--- a/weed/server/filer_server_handlers_write_autochunk.go
+++ b/weed/server/filer_server_handlers_write_autochunk.go
@@ -20,9 +20,9 @@ import (
"github.com/seaweedfs/seaweedfs/weed/operation"
"github.com/seaweedfs/seaweedfs/weed/pb/filer_pb"
"github.com/seaweedfs/seaweedfs/weed/s3api/s3_constants"
- "github.com/seaweedfs/seaweedfs/weed/util/constants"
"github.com/seaweedfs/seaweedfs/weed/storage/needle"
"github.com/seaweedfs/seaweedfs/weed/util"
+ "github.com/seaweedfs/seaweedfs/weed/util/constants"
)
func (fs *FilerServer) autoChunk(ctx context.Context, w http.ResponseWriter, r *http.Request, contentLength int64, so *operation.StorageOption) {
diff --git a/weed/server/master_grpc_server_volume.go b/weed/server/master_grpc_server_volume.go
index 719cd4b74..a7ef8e7e9 100644
--- a/weed/server/master_grpc_server_volume.go
+++ b/weed/server/master_grpc_server_volume.go
@@ -29,7 +29,7 @@ const (
func (ms *MasterServer) DoAutomaticVolumeGrow(req *topology.VolumeGrowRequest) {
if ms.option.VolumeGrowthDisabled {
- glog.V(1).Infof("automatic volume grow disabled")
+ glog.V(1).Infof("automatic volume grow disabled")
return
}
glog.V(1).Infoln("starting automatic volume grow")
diff --git a/weed/shell/command_volume_check_disk.go b/weed/shell/command_volume_check_disk.go
index 4d246e26c..fbad37f02 100644
--- a/weed/shell/command_volume_check_disk.go
+++ b/weed/shell/command_volume_check_disk.go
@@ -185,18 +185,18 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeRepl
aHasChanges, bHasChanges := true, true
const maxIterations = 5
iteration := 0
-
+
for (aHasChanges || bHasChanges) && iteration < maxIterations {
iteration++
if verbose {
fmt.Fprintf(c.writer, "sync iteration %d for volume %d\n", iteration, a.info.Id)
}
-
+
prevAHasChanges, prevBHasChanges := aHasChanges, bHasChanges
if aHasChanges, bHasChanges, err = c.checkBoth(a, b, applyChanges, doSyncDeletions, nonRepairThreshold, verbose); err != nil {
return err
}
-
+
// Detect if we're stuck in a loop with no progress
if iteration > 1 && prevAHasChanges == aHasChanges && prevBHasChanges == bHasChanges && (aHasChanges || bHasChanges) {
fmt.Fprintf(c.writer, "volume %d sync is not making progress between %s and %s after iteration %d, stopping to prevent infinite loop\n",
@@ -204,13 +204,13 @@ func (c *commandVolumeCheckDisk) syncTwoReplicas(a *VolumeReplica, b *VolumeRepl
return fmt.Errorf("sync not making progress after %d iterations", iteration)
}
}
-
+
if iteration >= maxIterations && (aHasChanges || bHasChanges) {
fmt.Fprintf(c.writer, "volume %d sync reached maximum iterations (%d) between %s and %s, may need manual intervention\n",
a.info.Id, maxIterations, a.location.dataNode.Id, b.location.dataNode.Id)
return fmt.Errorf("reached maximum sync iterations (%d)", maxIterations)
}
-
+
return nil
}