Skip to content

Commit

Permalink
Fix custom S3 implementation and test with min.io (#5)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyberj0g authored Aug 11, 2022
1 parent 9304938 commit 4252847
Show file tree
Hide file tree
Showing 4 changed files with 79 additions and 40 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,14 @@ Livepeer cloud storage upload utility. Called by Mist to upload video segments.
- in case of error, return code is not zero, and error message is returned to stderr as plain text

# Example usage
## S3
```
./catalyst-uploader s3://AWS_KEY:AWS_SECRET@eu-west-1/video-upload-test/test/fa7cb350-8978-4f7d-b54f-b0b67632fcf2.ts
```
## Min.io
```
./catalyst-uploader s3+http://AWS_KEY:AWS_SECRET@localhost:9000/video-upload-test/test/fa7cb350-8978-4f7d-b54f-b0b67632fcf2.ts
```

# Running tests
Some tests require environment variables holding cloud service credentials to be set to run.
110 changes: 71 additions & 39 deletions cmd/catalyst-uploader/catalyst-uploader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,17 +12,69 @@ import (
"net/url"
"os"
"os/exec"
"path"
"strings"
"testing"
)

func splitNonEmpty(str string, sep rune) []string {
splitFn := func(c rune) bool {
return c == sep
}
return strings.FieldsFunc(str, splitFn)
}

func buildUploader(assert *assert.Assertions) {
// build app
build := exec.Command("go", strings.Split("build catalyst-uploader.go", " ")...)
err := build.Run()
assert.NoError(err)
}

func testE2E(assert *assert.Assertions, fullUriStr string) {
buildUploader(assert)
// create random data
rndData := make([]byte, 1024*128+10)
rand.Read(rndData)
stdinReader := bytes.NewReader(rndData)
// run
args := fullUriStr
uploader := exec.Command("./catalyst-uploader", strings.Split(args, " ")...)
uploader.Stdin = stdinReader
stdoutRes, err := uploader.Output()
fmt.Println(string(stdoutRes))
assert.NoError(err)

// check output
outJson := struct {
Uri string `json:"uri"`
}{}
err = json.Unmarshal(stdoutRes, &outJson)
assert.NoError(err)

// load object and compare contents
outUrl, _ := url.Parse(outJson.Uri)
fullUri, _ := outUrl.Parse(fullUriStr)
bucket := splitNonEmpty(fullUri.Path, '/')[0]
if !strings.Contains(outUrl.Host, bucket) {
// if bucket is not included in domain name of output URI, then it's already in the path
bucket = ""
}
// compare key after leading slash
assert.Equal(fullUri.Path, path.Clean("/"+bucket+"/"+outUrl.Path))
os, err := drivers.ParseOSURL(fullUriStr, true)
assert.NoError(err)
session := os.NewSession("")
// second argument is object key and passed to API unmodified
data, err := session.ReadData(context.Background(), "")
assert.NoError(err)
assert.Equal(*data.Size, int64(len(rndData)))
osBuf := new(bytes.Buffer)
osBuf.ReadFrom(data.Body)
osData := osBuf.Bytes()
assert.Equal(rndData, osData)
}

func TestFsHandlerE2E(t *testing.T) {
assert := assert.New(t)
buildUploader(assert)
Expand Down Expand Up @@ -62,48 +114,28 @@ func TestFsHandlerE2E(t *testing.T) {

func TestS3HandlerE2E(t *testing.T) {
assert := assert.New(t)
buildUploader(assert)
// create random data
rndData := make([]byte, 1024*1024+10)
rand.Read(rndData)
stdinReader := bytes.NewReader(rndData)

s3key := os.Getenv("AWS_TEST_KEY")
s3secret := os.Getenv("AWS_TEST_SECRET")
s3region := os.Getenv("AWS_TEST_REGION")
s3bucket := os.Getenv("AWS_TEST_BUCKET")
s3key := os.Getenv("AWS_S3_KEY")
s3secret := os.Getenv("AWS_S3_SECRET")
s3region := os.Getenv("AWS_S3_REGION")
s3bucket := os.Getenv("AWS_S3_BUCKET")
if s3key != "" && s3secret != "" && s3region != "" && s3bucket != "" {
// run
testKey := "/test/" + uuid.New().String() + ".ts"
args := fmt.Sprintf("s3://%s:%s@%s/%s%s", s3key, s3secret, s3region, s3bucket, testKey)
uploader := exec.Command("./catalyst-uploader", strings.Split(args, " ")...)
uploader.Stdin = stdinReader
stdoutRes, err := uploader.Output()
fmt.Println(string(stdoutRes))
assert.NoError(err)

// check output
outJson := struct {
Uri string `json:"uri"`
}{}
err = json.Unmarshal(stdoutRes, &outJson)
assert.NoError(err)
uri := fmt.Sprintf("s3://%s:%s@%s/%s%s", s3key, s3secret, s3region, s3bucket, testKey)
testE2E(assert, uri)
} else {
fmt.Println("No S3 credentials, test skipped")
}
}

// load object and compare contents
url, _ := url.Parse(outJson.Uri)
// compare key after leading slash
assert.Equal(testKey, url.Path)
os, err := drivers.ParseOSURL(fmt.Sprintf("s3://%s:%s@%s/%s%s", s3key, s3secret, s3region, s3bucket, testKey), true)
assert.NoError(err)
session := os.NewSession("")
// second argument is object key and passed to API unmodified
data, err := session.ReadData(context.Background(), "")
assert.NoError(err)
assert.Equal(*data.Size, int64(len(rndData)))
osBuf := new(bytes.Buffer)
osBuf.ReadFrom(data.Body)
osData := osBuf.Bytes()
assert.Equal(rndData, osData)
func TestMinioHandlerE2E(t *testing.T) {
assert := assert.New(t)
s3key := os.Getenv("MINIO_S3_KEY")
s3secret := os.Getenv("MINIO_S3_SECRET")
s3bucket := os.Getenv("MINIO_S3_BUCKET")
if s3key != "" && s3secret != "" && s3bucket != "" {
testKey := "/test/" + uuid.New().String() + ".ts"
uri := fmt.Sprintf("s3+http://%s:%s@localhost:9000/%s%s", s3key, s3secret, s3bucket, testKey)
testE2E(assert, uri)
} else {
fmt.Println("No S3 credentials, test skipped")
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ require (
github.com/googleapis/gax-go/v2 v2.4.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/livepeer/go-tools v0.0.0-20220805063103-76df6beb6506 // indirect
github.com/livepeer/go-tools v0.0.0-20220811104423-080e77013571 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/objx v0.4.0 // indirect
go.opencensus.io v0.23.0 // indirect
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,8 @@ github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE=
github.com/livepeer/go-tools v0.0.0-20220805063103-76df6beb6506 h1:qKon23c1RQPvL5Oya/hkImbaXNMkt6VdYtnh5jcIhoY=
github.com/livepeer/go-tools v0.0.0-20220805063103-76df6beb6506/go.mod h1:aLVS1DT0ur9kpr0IlNI4DNcm9vVjRRUjDnwuEUm0BdQ=
github.com/livepeer/go-tools v0.0.0-20220811104423-080e77013571 h1:uRIy3pBd1fV5xfKUwOBDbb3gt6RsB0T+YHLc+EFYyNE=
github.com/livepeer/go-tools v0.0.0-20220811104423-080e77013571/go.mod h1:aLVS1DT0ur9kpr0IlNI4DNcm9vVjRRUjDnwuEUm0BdQ=
github.com/pkg/errors v0.9.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
Expand Down

0 comments on commit 4252847

Please sign in to comment.