diff --git a/artifacts.go b/artifacts.go index 8c863d0..6ccee23 100644 --- a/artifacts.go +++ b/artifacts.go @@ -5,6 +5,7 @@ import ( "encoding/hex" "fmt" "io" + "os" artifactFS "github.com/flanksource/artifacts/fs" "github.com/flanksource/duty/context" @@ -36,10 +37,60 @@ func (t *MIMEWriter) Detect() *mimetype.MIME { return mimetype.Detect(t.buffer) } +// readerWithLength wraps an io.Reader and carries content length information +type readerWithLength struct { + reader io.Reader + length int64 +} + +func (r *readerWithLength) Read(p []byte) (n int, err error) { + return r.reader.Read(p) +} + +// ContentLength returns the content length if known, -1 otherwise +func (r *readerWithLength) ContentLength() int64 { + return r.length +} + +// determineContentLength attempts to determine the content length from the reader +// using type assertions and heuristics +func determineContentLength(r io.Reader) int64 { + // Try common interfaces that provide size information + switch v := r.(type) { + case interface{ Len() int }: + return int64(v.Len()) + case interface{ Size() int64 }: + return v.Size() + case *os.File: + if stat, err := v.Stat(); err == nil { + return stat.Size() + } + } + + // Check if it's a bytes.Reader or strings.Reader + if seeker, ok := r.(io.Seeker); ok { + // Get current position + current, err := seeker.Seek(0, io.SeekCurrent) + if err == nil { + // Seek to end to get size + end, err := seeker.Seek(0, io.SeekEnd) + if err == nil { + // Restore original position + _, _ = seeker.Seek(current, io.SeekStart) + return end - current + } + } + } + + // Unknown length + return -1 +} + type Artifact struct { - ContentType string - Path string - Content io.ReadCloser + ContentType string + Path string + Content io.ReadCloser + ContentLength int64 // Optional: content length if known, -1 if unknown } const maxBytesForMimeDetection = 512 * 1024 // 512KB @@ -47,13 +98,24 @@ const maxBytesForMimeDetection = 512 * 1024 // 512KB func SaveArtifact(ctx context.Context, fs artifactFS.FilesystemRW, artifact *models.Artifact, data Artifact) error { defer func() { _ = data.Content.Close() }() + // Determine content length if not already provided + if data.ContentLength < 0 { + data.ContentLength = determineContentLength(data.Content) + } + checksum := sha256.New() mimeReader := io.TeeReader(data.Content, checksum) mimeWriter := &MIMEWriter{Max: maxBytesForMimeDetection} fileReader := io.TeeReader(mimeReader, mimeWriter) - info, err := fs.Write(ctx, data.Path, fileReader) + // Create a reader wrapper that carries the content length + wrappedReader := &readerWithLength{ + reader: fileReader, + length: data.ContentLength, + } + + info, err := fs.Write(ctx, data.Path, wrappedReader) if err != nil { return fmt.Errorf("error writing artifact(%s): %w", data.Path, err) } diff --git a/fs/s3.go b/fs/s3.go index 91c0d30..ae03a07 100644 --- a/fs/s3.go +++ b/fs/s3.go @@ -1,6 +1,7 @@ package fs import ( + "bytes" gocontext "context" "io" "io/fs" @@ -151,10 +152,29 @@ func (t *s3FS) Read(ctx gocontext.Context, key string) (io.ReadCloser, error) { } func (t *s3FS) Write(ctx gocontext.Context, path string, data io.Reader) (os.FileInfo, error) { + // Try to determine content length from the reader using type-based heuristics + contentLength := getContentLength(data) + + var body io.Reader + if contentLength >= 0 { + // Content length is known, use the reader directly + body = data + } else { + // Content length unknown, need to buffer to determine size + // This is required because S3 PutObject requires Content-Length header + content, err := io.ReadAll(data) + if err != nil { + return nil, err + } + contentLength = int64(len(content)) + body = bytes.NewReader(content) + } + _, err := t.Client.PutObject(ctx, &s3.PutObjectInput{ - Bucket: aws.String(t.Bucket), - Key: aws.String(path), - Body: data, + Bucket: aws.String(t.Bucket), + Key: aws.String(path), + Body: body, + ContentLength: &contentLength, }) if err != nil { @@ -163,3 +183,45 @@ func (t *s3FS) Write(ctx gocontext.Context, path string, data io.Reader) (os.Fil return t.Stat(path) } + +// getContentLength attempts to determine content length from the reader using heuristics +func getContentLength(r io.Reader) int64 { + // Check for our custom readerWithLength wrapper + if rwl, ok := r.(interface{ ContentLength() int64 }); ok { + return rwl.ContentLength() + } + + // Try common interfaces that provide size information + switch v := r.(type) { + case interface{ Len() int }: + return int64(v.Len()) + case interface{ Size() int64 }: + return v.Size() + case *bytes.Reader: + return int64(v.Len()) + case *strings.Reader: + return int64(v.Len()) + case *os.File: + if stat, err := v.Stat(); err == nil { + return stat.Size() + } + } + + // Check if it's a seeker (but don't modify position for streaming readers) + if seeker, ok := r.(io.Seeker); ok { + // Only try this for known seekable types + if _, isBytesReader := r.(*bytes.Reader); isBytesReader { + current, err := seeker.Seek(0, io.SeekCurrent) + if err == nil { + end, err := seeker.Seek(0, io.SeekEnd) + if err == nil { + _, _ = seeker.Seek(current, io.SeekStart) + return end - current + } + } + } + } + + // Unknown length + return -1 +} diff --git a/fs/s3_e2e_test.go b/fs/s3_e2e_test.go new file mode 100644 index 0000000..7f707a4 --- /dev/null +++ b/fs/s3_e2e_test.go @@ -0,0 +1,239 @@ +package fs + +import ( + gocontext "context" + "crypto/sha256" + "encoding/hex" + "io" + "strings" + "testing" + "time" + + "github.com/flanksource/duty/connection" + "github.com/flanksource/duty/context" + "github.com/flanksource/duty/types" +) + +// TestS3E2E_ContentLength tests the S3 Content-Length fix with LocalStack +// This test validates that PutObject works correctly with Content-Length header +// when using io.Reader chains (TeeReaders) as used in the artifact system +func TestS3E2E_ContentLength(t *testing.T) { + if testing.Short() { + t.Skip("skipping E2E test in short mode") + } + + parentCtx, cancel := gocontext.WithTimeout(gocontext.Background(), 2*time.Minute) + defer cancel() + + ctx := context.NewContext(parentCtx) + + // Test with LocalStack + t.Run("LocalStack", func(t *testing.T) { + testS3ContentLength(t, ctx, "http://localhost:4566", "test", "us-east-1", "test", "test") + }) + + // Test with MinIO as well for compatibility + t.Run("MinIO", func(t *testing.T) { + testS3ContentLength(t, ctx, "http://localhost:9000", "test", "us-east-1", "minioadmin", "minioadmin") + }) +} + +func testS3ContentLength(t *testing.T, ctx context.Context, endpoint, bucket, region, accessKey, secretKey string) { + t.Helper() + + // Create S3 filesystem client + s3FS, err := NewS3FS(ctx, bucket, connection.S3Connection{ + Bucket: bucket, + UsePathStyle: true, + AWSConnection: connection.AWSConnection{ + AccessKey: types.EnvVar{ValueStatic: accessKey}, + SecretKey: types.EnvVar{ValueStatic: secretKey}, + Region: region, + Endpoint: endpoint, + SkipTLSVerify: true, + }, + }) + if err != nil { + t.Fatalf("failed to create S3 filesystem: %v", err) + } + defer s3FS.Close() + + // Create bucket if it doesn't exist + createBucket(t, s3FS.Client, bucket) + + // Test 1: Simple write with small content + t.Run("SimpleWrite", func(t *testing.T) { + content := "Hello, World!" + key := "test-simple.txt" + + info, err := s3FS.Write(ctx, key, strings.NewReader(content)) + if err != nil { + t.Fatalf("failed to write to S3: %v", err) + } + + if info.Size() != int64(len(content)) { + t.Errorf("expected size %d, got %d", len(content), info.Size()) + } + + // Verify we can read it back + reader, err := s3FS.Read(ctx, key) + if err != nil { + t.Fatalf("failed to read from S3: %v", err) + } + defer reader.Close() + + readContent, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("failed to read content: %v", err) + } + + if string(readContent) != content { + t.Errorf("expected content %q, got %q", content, string(readContent)) + } + }) + + // Test 2: Write with TeeReader chain (simulates artifact saving with checksum) + // This is the critical test case that was failing before the fix + t.Run("WriteWithTeeReader", func(t *testing.T) { + content := `{"status": "success", "data": [1,2,3,4,5], "message": "test artifact"}` + key := "test-artifact.json" + + // Create a TeeReader chain similar to what SaveArtifact does + checksum := sha256.New() + teeReader := io.TeeReader(strings.NewReader(content), checksum) + + info, err := s3FS.Write(ctx, key, teeReader) + if err != nil { + t.Fatalf("failed to write with TeeReader to S3: %v", err) + } + + if info.Size() != int64(len(content)) { + t.Errorf("expected size %d, got %d", len(content), info.Size()) + } + + // Verify checksum was calculated + expectedChecksum := sha256.Sum256([]byte(content)) + actualChecksum := hex.EncodeToString(checksum.Sum(nil)) + expectedChecksumStr := hex.EncodeToString(expectedChecksum[:]) + + if actualChecksum != expectedChecksumStr { + t.Errorf("checksum mismatch: expected %s, got %s", expectedChecksumStr, actualChecksum) + } + + // Verify we can read it back + reader, err := s3FS.Read(ctx, key) + if err != nil { + t.Fatalf("failed to read from S3: %v", err) + } + defer reader.Close() + + readContent, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("failed to read content: %v", err) + } + + if string(readContent) != content { + t.Errorf("expected content %q, got %q", content, string(readContent)) + } + }) + + // Test 3: Write with multiple TeeReaders (most complex case) + t.Run("WriteWithMultipleTeeReaders", func(t *testing.T) { + content := strings.Repeat("abcdefghijklmnopqrstuvwxyz", 100) // ~2.6KB + key := "test-large-artifact.txt" + + // Create multiple TeeReaders to simulate checksum + MIME detection + checksum := sha256.New() + teeReader1 := io.TeeReader(strings.NewReader(content), checksum) + + // Second TeeReader for MIME detection simulation + var mimeBuffer []byte + mimeWriter := &testWriter{buffer: &mimeBuffer, max: 512} + teeReader2 := io.TeeReader(teeReader1, mimeWriter) + + info, err := s3FS.Write(ctx, key, teeReader2) + if err != nil { + t.Fatalf("failed to write with multiple TeeReaders to S3: %v", err) + } + + if info.Size() != int64(len(content)) { + t.Errorf("expected size %d, got %d", len(content), info.Size()) + } + + // Verify checksum + expectedChecksum := sha256.Sum256([]byte(content)) + actualChecksum := hex.EncodeToString(checksum.Sum(nil)) + expectedChecksumStr := hex.EncodeToString(expectedChecksum[:]) + + if actualChecksum != expectedChecksumStr { + t.Errorf("checksum mismatch: expected %s, got %s", expectedChecksumStr, actualChecksum) + } + + // Verify we can read it back + reader, err := s3FS.Read(ctx, key) + if err != nil { + t.Fatalf("failed to read from S3: %v", err) + } + defer reader.Close() + + readContent, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("failed to read content: %v", err) + } + + if string(readContent) != content { + t.Errorf("content mismatch: lengths differ (expected %d, got %d)", len(content), len(readContent)) + } + }) + + // Test 4: Large file (tests that buffering works with larger payloads) + t.Run("LargeFile", func(t *testing.T) { + // Create a 1MB file + content := strings.Repeat("x", 1024*1024) + key := "test-large-file.bin" + + info, err := s3FS.Write(ctx, key, strings.NewReader(content)) + if err != nil { + t.Fatalf("failed to write large file to S3: %v", err) + } + + if info.Size() != int64(len(content)) { + t.Errorf("expected size %d, got %d", len(content), info.Size()) + } + + // Verify we can read it back + reader, err := s3FS.Read(ctx, key) + if err != nil { + t.Fatalf("failed to read from S3: %v", err) + } + defer reader.Close() + + readContent, err := io.ReadAll(reader) + if err != nil { + t.Fatalf("failed to read content: %v", err) + } + + if len(readContent) != len(content) { + t.Errorf("size mismatch: expected %d, got %d", len(content), len(readContent)) + } + }) +} + +// testWriter is a simple writer that buffers up to max bytes for testing +type testWriter struct { + buffer *[]byte + max int +} + +func (w *testWriter) Write(p []byte) (n int, err error) { + if len(*w.buffer) >= w.max { + return len(p), nil // Don't buffer more, but pretend we wrote it + } + + remaining := w.max - len(*w.buffer) + if remaining > len(p) { + remaining = len(p) + } + *w.buffer = append(*w.buffer, p[:remaining]...) + return len(p), nil +} diff --git a/fs/testdata/README.md b/fs/testdata/README.md new file mode 100644 index 0000000..fab13b4 --- /dev/null +++ b/fs/testdata/README.md @@ -0,0 +1,119 @@ +# E2E Integration Tests + +This directory contains end-to-end integration tests for the artifacts filesystem implementations. + +## Prerequisites + +- Docker and Docker Compose +- Go 1.24+ + +## Running Tests + +### Start Test Services + +First, start the required services using Docker Compose: + +```bash +cd fs/testdata +docker compose up -d +``` + +This will start: +- **LocalStack**: AWS service emulator (S3 on port 4566) +- **MinIO**: S3-compatible storage (ports 9000, 9001) +- **Fake GCS Server**: Google Cloud Storage emulator (port 4443) +- **SFTP Server**: SSH/SFTP server (port 2222) +- **Samba Server**: SMB/CIFS server (port 445) + +### Wait for Services to be Ready + +```bash +# Check service health +docker compose ps + +# Wait for LocalStack to be ready +until curl -f http://localhost:4566/_localstack/health; do sleep 1; done + +# Wait for MinIO to be ready +until curl -f http://localhost:9000/minio/health/live; do sleep 1; done +``` + +### Run All Tests + +```bash +# From the repository root +make test + +# Or run Go tests directly +go test ./fs -v +``` + +### Run Only E2E Tests + +```bash +# Run S3 E2E tests specifically +go test ./fs -v -run TestS3E2E + +# Skip E2E tests (for quick unit tests) +go test ./fs -v -short +``` + +### Stop Test Services + +```bash +cd fs/testdata +docker compose down +``` + +## Test Structure + +### Unit Tests +- `fs_test.go`: General filesystem tests (reads, writes, globs) + +### E2E Tests +- `s3_e2e_test.go`: S3-specific end-to-end tests + - Tests Content-Length header fix + - Tests with TeeReader chains (simulates artifact saving) + - Tests with LocalStack and MinIO + +## Environment Variables + +The following environment variables can be set to customize test behavior: + +- `TEST_AWS_ACCESS_KEY_ID`: AWS access key (default: "test" for LocalStack, "minioadmin" for MinIO) +- `TEST_AWS_SECRET_ACCESS_KEY`: AWS secret key (default: "test" for LocalStack, "minioadmin" for MinIO) +- `TEST_AWS_REGION`: AWS region (default: "us-east-1") + +## LocalStack vs MinIO + +The E2E tests run against both LocalStack and MinIO: + +- **LocalStack**: More accurate AWS S3 emulation, better for testing AWS-specific behaviors +- **MinIO**: Lightweight S3-compatible storage, faster startup, good for general S3 API testing + +## Troubleshooting + +### Tests Fail with Connection Refused +Make sure Docker services are running: +```bash +cd fs/testdata +docker compose ps +docker compose logs +``` + +### LocalStack Health Check Fails +Check LocalStack logs: +```bash +docker compose logs localstack +``` + +### Port Conflicts +If ports are already in use, you can modify the port mappings in `docker-compose.yml`. + +## CI/CD Integration + +The GitHub Actions workflow in `.github/workflows/test.yml` automatically: +1. Starts Docker Compose services (including LocalStack and MinIO) +2. Waits for services to be healthy +3. Runs all tests including E2E tests +4. Stops services and cleans up diff --git a/fs/testdata/docker-compose.yml b/fs/testdata/docker-compose.yml index 9cca383..3122f20 100644 --- a/fs/testdata/docker-compose.yml +++ b/fs/testdata/docker-compose.yml @@ -23,6 +23,25 @@ services: timeout: 20s retries: 3 + localstack: + image: localstack/localstack:latest + container_name: localstack + ports: + - "4566:4566" + environment: + - SERVICES=s3 + - DEBUG=1 + - AWS_DEFAULT_REGION=us-east-1 + - AWS_ACCESS_KEY_ID=test + - AWS_SECRET_ACCESS_KEY=test + volumes: + - "/var/run/docker.sock:/var/run/docker.sock" + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:4566/_localstack/health"] + interval: 10s + timeout: 5s + retries: 5 + smb: image: dperson/samba ports: