From 4252847669ebbec888a8a28bed1b82e80176ae23 Mon Sep 17 00:00:00 2001 From: Ivan Poleshchuk Date: Thu, 11 Aug 2022 16:50:50 +0500 Subject: [PATCH] Fix custom S3 implementation and test with min.io (#5) --- README.md | 5 + .../catalyst-uploader_test.go | 110 +++++++++++------- go.mod | 2 +- go.sum | 2 + 4 files changed, 79 insertions(+), 40 deletions(-) diff --git a/README.md b/README.md index cc0a0e2..ad85311 100644 --- a/README.md +++ b/README.md @@ -12,9 +12,14 @@ Livepeer cloud storage upload utility. Called by Mist to upload video segments. - in case of error, return code is not zero, and error message is returned to stderr as plain text # Example usage +## S3 ``` ./catalyst-uploader s3://AWS_KEY:AWS_SECRET@eu-west-1/video-upload-test/test/fa7cb350-8978-4f7d-b54f-b0b67632fcf2.ts ``` +## Min.io +``` +./catalyst-uploader s3+http://AWS_KEY:AWS_SECRET@localhost:9000/video-upload-test/test/fa7cb350-8978-4f7d-b54f-b0b67632fcf2.ts +``` # Running tests Some tests require environment variables holding cloud service credentials to be set to run. diff --git a/cmd/catalyst-uploader/catalyst-uploader_test.go b/cmd/catalyst-uploader/catalyst-uploader_test.go index 16b459c..b243a57 100644 --- a/cmd/catalyst-uploader/catalyst-uploader_test.go +++ b/cmd/catalyst-uploader/catalyst-uploader_test.go @@ -12,10 +12,18 @@ import ( "net/url" "os" "os/exec" + "path" "strings" "testing" ) +func splitNonEmpty(str string, sep rune) []string { + splitFn := func(c rune) bool { + return c == sep + } + return strings.FieldsFunc(str, splitFn) +} + func buildUploader(assert *assert.Assertions) { // build app build := exec.Command("go", strings.Split("build catalyst-uploader.go", " ")...) @@ -23,6 +31,50 @@ func buildUploader(assert *assert.Assertions) { assert.NoError(err) } +func testE2E(assert *assert.Assertions, fullUriStr string) { + buildUploader(assert) + // create random data + rndData := make([]byte, 1024*128+10) + rand.Read(rndData) + stdinReader := bytes.NewReader(rndData) + // run + args := fullUriStr + uploader := exec.Command("./catalyst-uploader", strings.Split(args, " ")...) + uploader.Stdin = stdinReader + stdoutRes, err := uploader.Output() + fmt.Println(string(stdoutRes)) + assert.NoError(err) + + // check output + outJson := struct { + Uri string `json:"uri"` + }{} + err = json.Unmarshal(stdoutRes, &outJson) + assert.NoError(err) + + // load object and compare contents + outUrl, _ := url.Parse(outJson.Uri) + fullUri, _ := outUrl.Parse(fullUriStr) + bucket := splitNonEmpty(fullUri.Path, '/')[0] + if !strings.Contains(outUrl.Host, bucket) { + // if bucket is not included in domain name of output URI, then it's already in the path + bucket = "" + } + // compare key after leading slash + assert.Equal(fullUri.Path, path.Clean("/"+bucket+"/"+outUrl.Path)) + os, err := drivers.ParseOSURL(fullUriStr, true) + assert.NoError(err) + session := os.NewSession("") + // second argument is object key and passed to API unmodified + data, err := session.ReadData(context.Background(), "") + assert.NoError(err) + assert.Equal(*data.Size, int64(len(rndData))) + osBuf := new(bytes.Buffer) + osBuf.ReadFrom(data.Body) + osData := osBuf.Bytes() + assert.Equal(rndData, osData) +} + func TestFsHandlerE2E(t *testing.T) { assert := assert.New(t) buildUploader(assert) @@ -62,48 +114,28 @@ func TestFsHandlerE2E(t *testing.T) { func TestS3HandlerE2E(t *testing.T) { assert := assert.New(t) - buildUploader(assert) - // create random data - rndData := make([]byte, 1024*1024+10) - rand.Read(rndData) - stdinReader := bytes.NewReader(rndData) - - s3key := os.Getenv("AWS_TEST_KEY") - s3secret := os.Getenv("AWS_TEST_SECRET") - s3region := os.Getenv("AWS_TEST_REGION") - s3bucket := os.Getenv("AWS_TEST_BUCKET") + s3key := os.Getenv("AWS_S3_KEY") + s3secret := os.Getenv("AWS_S3_SECRET") + s3region := os.Getenv("AWS_S3_REGION") + s3bucket := os.Getenv("AWS_S3_BUCKET") if s3key != "" && s3secret != "" && s3region != "" && s3bucket != "" { - // run testKey := "/test/" + uuid.New().String() + ".ts" - args := fmt.Sprintf("s3://%s:%s@%s/%s%s", s3key, s3secret, s3region, s3bucket, testKey) - uploader := exec.Command("./catalyst-uploader", strings.Split(args, " ")...) - uploader.Stdin = stdinReader - stdoutRes, err := uploader.Output() - fmt.Println(string(stdoutRes)) - assert.NoError(err) - - // check output - outJson := struct { - Uri string `json:"uri"` - }{} - err = json.Unmarshal(stdoutRes, &outJson) - assert.NoError(err) + uri := fmt.Sprintf("s3://%s:%s@%s/%s%s", s3key, s3secret, s3region, s3bucket, testKey) + testE2E(assert, uri) + } else { + fmt.Println("No S3 credentials, test skipped") + } +} - // load object and compare contents - url, _ := url.Parse(outJson.Uri) - // compare key after leading slash - assert.Equal(testKey, url.Path) - os, err := drivers.ParseOSURL(fmt.Sprintf("s3://%s:%s@%s/%s%s", s3key, s3secret, s3region, s3bucket, testKey), true) - assert.NoError(err) - session := os.NewSession("") - // second argument is object key and passed to API unmodified - data, err := session.ReadData(context.Background(), "") - assert.NoError(err) - assert.Equal(*data.Size, int64(len(rndData))) - osBuf := new(bytes.Buffer) - osBuf.ReadFrom(data.Body) - osData := osBuf.Bytes() - assert.Equal(rndData, osData) +func TestMinioHandlerE2E(t *testing.T) { + assert := assert.New(t) + s3key := os.Getenv("MINIO_S3_KEY") + s3secret := os.Getenv("MINIO_S3_SECRET") + s3bucket := os.Getenv("MINIO_S3_BUCKET") + if s3key != "" && s3secret != "" && s3bucket != "" { + testKey := "/test/" + uuid.New().String() + ".ts" + uri := fmt.Sprintf("s3+http://%s:%s@localhost:9000/%s%s", s3key, s3secret, s3bucket, testKey) + testE2E(assert, uri) } else { fmt.Println("No S3 credentials, test skipped") } diff --git a/go.mod b/go.mod index 5d5134d..44b8deb 100644 --- a/go.mod +++ b/go.mod @@ -25,7 +25,7 @@ require ( github.com/googleapis/gax-go/v2 v2.4.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect github.com/kr/text v0.2.0 // indirect - github.com/livepeer/go-tools v0.0.0-20220805063103-76df6beb6506 // indirect + github.com/livepeer/go-tools v0.0.0-20220811104423-080e77013571 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect github.com/stretchr/objx v0.4.0 // indirect go.opencensus.io v0.23.0 // indirect diff --git a/go.sum b/go.sum index 337c099..b8606dc 100644 --- a/go.sum +++ b/go.sum @@ -209,6 +209,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= github.com/livepeer/go-tools v0.0.0-20220805063103-76df6beb6506 h1:qKon23c1RQPvL5Oya/hkImbaXNMkt6VdYtnh5jcIhoY= github.com/livepeer/go-tools v0.0.0-20220805063103-76df6beb6506/go.mod h1:aLVS1DT0ur9kpr0IlNI4DNcm9vVjRRUjDnwuEUm0BdQ= +github.com/livepeer/go-tools v0.0.0-20220811104423-080e77013571 h1:uRIy3pBd1fV5xfKUwOBDbb3gt6RsB0T+YHLc+EFYyNE= +github.com/livepeer/go-tools v0.0.0-20220811104423-080e77013571/go.mod h1:aLVS1DT0ur9kpr0IlNI4DNcm9vVjRRUjDnwuEUm0BdQ= github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=