diff options
Diffstat (limited to 'test/s3/multipart/aws_upload.go')
| -rw-r--r-- | test/s3/multipart/aws_upload.go | 175 |
1 files changed, 175 insertions, 0 deletions
diff --git a/test/s3/multipart/aws_upload.go b/test/s3/multipart/aws_upload.go new file mode 100644 index 000000000..8c15cf6ed --- /dev/null +++ b/test/s3/multipart/aws_upload.go @@ -0,0 +1,175 @@ +package main + +// copied from https://github.com/apoorvam/aws-s3-multipart-upload + +import ( + "bytes" + "flag" + "fmt" + "net/http" + "os" + + "github.com/aws/aws-sdk-go/aws" + "github.com/aws/aws-sdk-go/aws/awserr" + "github.com/aws/aws-sdk-go/aws/credentials" + "github.com/aws/aws-sdk-go/aws/session" + "github.com/aws/aws-sdk-go/service/s3" +) + +const ( + maxPartSize = int64(5 * 1024 * 1024) + maxRetries = 3 + awsAccessKeyID = "Your access key" + awsSecretAccessKey = "Your secret key" + awsBucketRegion = "S3 bucket region" + awsBucketName = "newBucket" +) + +var ( + filename = flag.String("f", "", "the file name") +) + +func main() { + flag.Parse() + + creds := credentials.NewStaticCredentials(awsAccessKeyID, awsSecretAccessKey, "") + _, err := creds.Get() + if err != nil { + fmt.Printf("bad credentials: %s", err) + } + cfg := aws.NewConfig().WithRegion(awsBucketRegion).WithCredentials(creds).WithDisableSSL(true).WithEndpoint("localhost:8333") + svc := s3.New(session.New(), cfg) + + file, err := os.Open(*filename) + if err != nil { + fmt.Printf("err opening file: %s", err) + return + } + defer file.Close() + fileInfo, _ := file.Stat() + size := fileInfo.Size() + buffer := make([]byte, size) + fileType := http.DetectContentType(buffer) + file.Read(buffer) + + path := "/media/" + file.Name() + input := &s3.CreateMultipartUploadInput{ + Bucket: aws.String(awsBucketName), + Key: aws.String(path), + ContentType: aws.String(fileType), + } + + resp, err := svc.CreateMultipartUpload(input) + if err != nil { + fmt.Println(err.Error()) + return + } + fmt.Println("Created multipart upload request") + + var curr, partLength int64 + var remaining = size + var completedParts []*s3.CompletedPart + partNumber := 1 + for curr = 0; remaining != 0; curr += partLength { + if remaining < maxPartSize { + partLength = remaining + } else { + partLength = maxPartSize + } + completedPart, err := uploadPart(svc, resp, buffer[curr:curr+partLength], partNumber) + if err != nil { + fmt.Println(err.Error()) + err := abortMultipartUpload(svc, resp) + if err != nil { + fmt.Println(err.Error()) + } + return + } + remaining -= partLength + partNumber++ + completedParts = append(completedParts, completedPart) + } + + // list parts + parts, err := svc.ListParts(&s3.ListPartsInput{ + Bucket: input.Bucket, + Key: input.Key, + MaxParts: nil, + PartNumberMarker: nil, + RequestPayer: nil, + UploadId: resp.UploadId, + }) + if err != nil { + fmt.Println(err.Error()) + return + } + fmt.Printf("list parts: %d\n", len(parts.Parts)) + for i, part := range parts.Parts { + fmt.Printf("part %d: %v\n", i, part) + } + + + completeResponse, err := completeMultipartUpload(svc, resp, completedParts) + if err != nil { + fmt.Println(err.Error()) + return + } + + fmt.Printf("Successfully uploaded file: %s\n", completeResponse.String()) +} + +func completeMultipartUpload(svc *s3.S3, resp *s3.CreateMultipartUploadOutput, completedParts []*s3.CompletedPart) (*s3.CompleteMultipartUploadOutput, error) { + completeInput := &s3.CompleteMultipartUploadInput{ + Bucket: resp.Bucket, + Key: resp.Key, + UploadId: resp.UploadId, + MultipartUpload: &s3.CompletedMultipartUpload{ + Parts: completedParts, + }, + } + return svc.CompleteMultipartUpload(completeInput) +} + +func uploadPart(svc *s3.S3, resp *s3.CreateMultipartUploadOutput, fileBytes []byte, partNumber int) (*s3.CompletedPart, error) { + tryNum := 1 + partInput := &s3.UploadPartInput{ + Body: bytes.NewReader(fileBytes), + Bucket: resp.Bucket, + Key: resp.Key, + PartNumber: aws.Int64(int64(partNumber)), + UploadId: resp.UploadId, + ContentLength: aws.Int64(int64(len(fileBytes))), + } + + for tryNum <= maxRetries { + uploadResult, err := svc.UploadPart(partInput) + if err != nil { + if tryNum == maxRetries { + if aerr, ok := err.(awserr.Error); ok { + return nil, aerr + } + return nil, err + } + fmt.Printf("Retrying to upload part #%v\n", partNumber) + tryNum++ + } else { + fmt.Printf("Uploaded part #%v\n", partNumber) + return &s3.CompletedPart{ + ETag: uploadResult.ETag, + PartNumber: aws.Int64(int64(partNumber)), + }, nil + } + } + return nil, nil +} + +func abortMultipartUpload(svc *s3.S3, resp *s3.CreateMultipartUploadOutput) error { + fmt.Println("Aborting multipart upload for UploadId#" + *resp.UploadId) + abortInput := &s3.AbortMultipartUploadInput{ + Bucket: resp.Bucket, + Key: resp.Key, + UploadId: resp.UploadId, + } + _, err := svc.AbortMultipartUpload(abortInput) + return err +} |
