Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions cmd/client/workflow_registry_v2_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package client
import (
"encoding/hex"
"errors"
"fmt"
"math/big"
"time"

Expand Down Expand Up @@ -387,6 +388,19 @@ func (wrc *WorkflowRegistryV2Client) GetMaxWorkflowsPerUserDON(user common.Addre
return val, err
}

func (wrc *WorkflowRegistryV2Client) GetMaxWorkflowsPerUserDONByFamily(user common.Address, donFamily string) (uint32, error) {
contract, err := workflow_registry_v2_wrapper.NewWorkflowRegistry(wrc.ContractAddress, wrc.EthClient.Client)
if err != nil {
wrc.Logger.Error().Err(err).Msg("Failed to connect for GetMaxWorkflowsPerUserDONByFamily")
return 0, err
}
val, err := contract.GetMaxWorkflowsPerUserDON(wrc.EthClient.NewCallOpts(), user, donFamily)
if err != nil {
wrc.Logger.Error().Err(err).Msg("GetMaxWorkflowsPerUserDONByFamily call failed")
}
return val, err
}

func (wrc *WorkflowRegistryV2Client) IsAllowedSigner(signer common.Address) (bool, error) {
contract, err := workflow_registry_v2_wrapper.NewWorkflowRegistry(wrc.ContractAddress, wrc.EthClient.Client)
if err != nil {
Expand Down Expand Up @@ -531,6 +545,67 @@ func (wrc *WorkflowRegistryV2Client) GetWorkflowListByOwnerAndName(owner common.
return result, err
}

func (wrc *WorkflowRegistryV2Client) GetWorkflowListByOwner(owner common.Address, start, limit *big.Int) ([]workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView, error) {
contract, err := workflow_registry_v2_wrapper.NewWorkflowRegistry(wrc.ContractAddress, wrc.EthClient.Client)
if err != nil {
wrc.Logger.Error().Err(err).Msg("Failed to connect for GetWorkflowListByOwner")
return nil, err
}

result, err := callContractMethodV2(wrc, func() ([]workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView, error) {
return contract.GetWorkflowListByOwner(wrc.EthClient.NewCallOpts(), owner, start, limit)
})
if err != nil {
wrc.Logger.Error().Err(err).Msg("GetWorkflowListByOwner call failed")
}
return result, err
}

func (wrc *WorkflowRegistryV2Client) CheckUserDonLimit(
owner common.Address,
donFamily string,
pending uint32,
) error {
const workflowStatusActive = uint8(0)
const workflowListPageSize = int64(200)

maxAllowed, err := wrc.GetMaxWorkflowsPerUserDONByFamily(owner, donFamily)
if err != nil {
return fmt.Errorf("failed to fetch per-user workflow limit: %w", err)
}

var currentActive uint32
start := big.NewInt(0)
limit := big.NewInt(workflowListPageSize)

for {
list, err := wrc.GetWorkflowListByOwner(owner, start, limit)
if err != nil {
return fmt.Errorf("failed to check active workflows for DON %s: %w", donFamily, err)
}
if len(list) == 0 {
break
}

for _, workflow := range list {
if workflow.Status == workflowStatusActive && workflow.DonFamily == donFamily {
currentActive++
}
}

start = big.NewInt(start.Int64() + int64(len(list)))
if int64(len(list)) < workflowListPageSize {
break
}
}

if currentActive+pending > maxAllowed {
return fmt.Errorf("workflow limit reached for DON %s: %d/%d active workflows", donFamily, currentActive, maxAllowed)
}

return nil
}

func (wrc *WorkflowRegistryV2Client) DeleteWorkflow(workflowID [32]byte) (*TxOutput, error) {
contract, err := workflow_registry_v2_wrapper.NewWorkflowRegistry(wrc.ContractAddress, wrc.EthClient.Client)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions cmd/workflow/activate/activate.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,10 @@ func (h *handler) Execute() error {
return fmt.Errorf("workflow is already active, cancelling transaction")
}

if err := h.wrc.CheckUserDonLimit(ownerAddr, h.inputs.DonFamily, 1); err != nil {
return err
}

fmt.Printf("Activating workflow: Name=%s, Owner=%s, WorkflowID=%s\n", workflowName, workflowOwner, hex.EncodeToString(latest.WorkflowId[:]))

txOut, err := h.wrc.ActivateWorkflow(latest.WorkflowId, h.inputs.DonFamily)
Expand Down
12 changes: 12 additions & 0 deletions cmd/workflow/deploy/deploy.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,18 @@ func (h *handler) Execute() error {
}
}

if err := checkUserDonLimitBeforeDeploy(
h.wrc,
h.wrc,
common.HexToAddress(h.inputs.WorkflowOwner),
h.inputs.DonFamily,
h.inputs.WorkflowName,
h.inputs.KeepAlive,
h.existingWorkflowStatus,
); err != nil {
return err
}

fmt.Println("\nUploading files...")
if err := h.uploadArtifacts(); err != nil {
return fmt.Errorf("failed to upload workflow: %w", err)
Expand Down
81 changes: 81 additions & 0 deletions cmd/workflow/deploy/deploy_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,15 @@ package deploy

import (
"errors"
"math/big"
"testing"

"github.com/ethereum/go-ethereum/common"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

workflow_registry_v2_wrapper "github.com/smartcontractkit/chainlink-evm/gethwrappers/workflow/generated/workflow_registry_wrapper_v2"

"github.com/smartcontractkit/cre-cli/internal/testutil/chainsim"
"github.com/smartcontractkit/cre-cli/internal/validation"
)
Expand Down Expand Up @@ -151,3 +155,80 @@ func TestWorkflowDeployCommand(t *testing.T) {
func stringPtr(s string) *string {
return &s
}

type fakeUserDonLimitClient struct {
maxAllowed uint32
workflowsByOwner []workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView
workflowsByOwnerName []workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView
}

func (f fakeUserDonLimitClient) CheckUserDonLimit(owner common.Address, donFamily string, pending uint32) error {
var currentActive uint32
for _, workflow := range f.workflowsByOwner {
if workflow.Owner == owner && workflow.Status == workflowStatusActive && workflow.DonFamily == donFamily {
currentActive++
}
}

if currentActive+pending > f.maxAllowed {
return errors.New("workflow limit reached")
}
return nil
}

func (f fakeUserDonLimitClient) GetWorkflowListByOwnerAndName(common.Address, string, *big.Int, *big.Int) ([]workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView, error) {
return f.workflowsByOwnerName, nil
}

func TestCheckUserDonLimitBeforeDeploy(t *testing.T) {
owner := common.HexToAddress(chainsim.TestAddress)
donFamily := "test-don"
workflowName := "test-workflow"

t.Run("errors when limit reached", func(t *testing.T) {
client := fakeUserDonLimitClient{
maxAllowed: 2,
workflowsByOwner: []workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView{
{Owner: owner, Status: workflowStatusActive, DonFamily: donFamily},
{Owner: owner, Status: workflowStatusActive, DonFamily: donFamily},
},
}
nameLookup := fakeUserDonLimitClient{}

err := checkUserDonLimitBeforeDeploy(client, nameLookup, owner, donFamily, workflowName, true, nil)
require.Error(t, err)
assert.Contains(t, err.Error(), "workflow limit reached")
})

t.Run("accounts for keepAlive false pausing same-name workflows", func(t *testing.T) {
client := fakeUserDonLimitClient{
maxAllowed: 2,
workflowsByOwner: []workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView{
{Owner: owner, Status: workflowStatusActive, DonFamily: donFamily},
{Owner: owner, Status: workflowStatusActive, DonFamily: donFamily},
},
}
nameLookup := fakeUserDonLimitClient{
workflowsByOwnerName: []workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView{
{Owner: owner, Status: workflowStatusActive, DonFamily: donFamily},
},
}

err := checkUserDonLimitBeforeDeploy(client, nameLookup, owner, donFamily, workflowName, false, nil)
require.NoError(t, err)
})

t.Run("skips check when updating existing workflow", func(t *testing.T) {
client := fakeUserDonLimitClient{
maxAllowed: 1,
workflowsByOwner: []workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView{
{Owner: owner, Status: workflowStatusActive, DonFamily: donFamily},
},
}
nameLookup := fakeUserDonLimitClient{}
existingStatus := uint8(0)

err := checkUserDonLimitBeforeDeploy(client, nameLookup, owner, donFamily, workflowName, true, &existingStatus)
require.NoError(t, err)
})
}
90 changes: 90 additions & 0 deletions cmd/workflow/deploy/limits.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package deploy

import (
"fmt"
"math/big"

"github.com/ethereum/go-ethereum/common"

workflow_registry_v2_wrapper "github.com/smartcontractkit/chainlink-evm/gethwrappers/workflow/generated/workflow_registry_wrapper_v2"
)

const (
workflowStatusActive = uint8(0)
workflowListPageSize = int64(200)
)

type workflowNameLookupClient interface {
GetWorkflowListByOwnerAndName(owner common.Address, workflowName string, start, limit *big.Int) ([]workflow_registry_v2_wrapper.WorkflowRegistryWorkflowMetadataView, error)
}

type userDonLimitChecker interface {
CheckUserDonLimit(owner common.Address, donFamily string, pending uint32) error
}

func checkUserDonLimitBeforeDeploy(
limitChecker userDonLimitChecker,
nameLookup workflowNameLookupClient,
owner common.Address,
donFamily string,
workflowName string,
keepAlive bool,
existingWorkflowStatus *uint8,
) error {
if existingWorkflowStatus != nil {
return nil
}

pending := uint32(1)
if !keepAlive {
activeSameName, err := countActiveWorkflowsByOwnerNameAndDON(nameLookup, owner, workflowName, donFamily)
if err != nil {
return fmt.Errorf("failed to check active workflows for %s on DON %s: %w", workflowName, donFamily, err)
}
if activeSameName >= pending {
pending = 0
} else {
pending -= activeSameName
}
}

if pending == 0 {
return nil
}

return limitChecker.CheckUserDonLimit(owner, donFamily, pending)
}

func countActiveWorkflowsByOwnerNameAndDON(
wrc workflowNameLookupClient,
owner common.Address,
workflowName string,
donFamily string,
) (uint32, error) {
var count uint32
start := big.NewInt(0)
limit := big.NewInt(workflowListPageSize)

for {
list, err := wrc.GetWorkflowListByOwnerAndName(owner, workflowName, start, limit)
if err != nil {
return 0, err
}
if len(list) == 0 {
break
}

for _, workflow := range list {
if workflow.Status == workflowStatusActive && workflow.DonFamily == donFamily {
count++
}
}

start = big.NewInt(start.Int64() + int64(len(list)))
if int64(len(list)) < workflowListPageSize {
break
}
}

return count, nil
}
Loading