Linear Vesting in Algorand Python

Featured on Hashnode

Introduction

A vesting schedule is a mechanism that gradually releases tokens or assets over a specified period of time. In a linear vesting schedule, the release of tokens is linearly distributed over time, meaning that an equal amount of tokens is released at regular intervals until the full allocation is vested.

Let's look at how we can build a linear vesting smart contract using Algorand Python.

The Formula

$$\displaystyle \begin{cases} 0 & \text{for}\: round < start \\allocation & \text{for}\: round \geq duration + start \\\left\lfloor{\frac{allocation \left(round - start\right)}{duration}}\right\rfloor & \text{otherwise} \end{cases}$$

SymbolDefinition
allocationThe total amount of MicroAlgos allocated to the contract for vesting.
roundThe current round number.
startThe first round at which funds can be released.
durationThe number of rounds that the vesting is linearly distributed over.

The smart contract will be using the AVM uint64 type for each of these values, so we use floor division to keep it simple.

This means the vesting amounts won't be perfectly linear for all input values, as I'll demonstrate in a moment.

Visualising the Schedule

We can use sympy to plot vesting schedules.

Let's start by defining the expression:

from sympy import Piecewise, symbols

allocation, at, start, duration = symbols("allocation round start duration", integer=True, positive=True)

expr = Piecewise(
  (0, at < start),
  (allocation, at >= start + duration),
  ((allocation * (at - start)) // duration, True) # True = otherwise
)

We can now use substitution to try out different combinations of values.

Let's start with the simplest example - an allocation of 100 MicroAlgos, starting at round 0, and lasting for a duration of 100 rounds:

schedule = expr.subs([
  (allocation, 100),
  (start, 0),
  (duration, 100)
])

$$\displaystyle \begin{cases} 100 & \text{for}\: round \geq 100 \\round & \text{otherwise} \end{cases}$$

We can plot this using:

plot(
  schedule,
  (at, 0, 150)
)

Which shows that the 100 MicroAlgo allocation is gradually vested at a rate of 1 MicroAlgo per round, until round 100.

A more realistic example might be:

schedule = expr.subs([
  (allocation, 1_000_000_000),
  (start, 50_000),
  (duration, 1_000_000)
])

$$\displaystyle \begin{cases} 0 & \text{for}\: round < 50000 \\1000000000 & \text{for}\: round \geq 1050000 \\1000 round - 50000000 & \text{otherwise} \end{cases}$$

plot(
  schedule,
  (at, 0, 1_100_000)
)

And finally, let's look at a scenario where floor division is going to ruin the pretty line:

schedule = expr.subs([
  (allocation, 12),
  (start, 0),
  (duration, 100)
])

$$\displaystyle \begin{cases} 12 & \text{for}\: round \geq 100 \\\left\lfloor{\frac{3 round}{25}}\right\rfloor & \text{otherwise} \end{cases}$$

plot(
  schedule,
  (at, 0, 120)
)

The amounts and intervals might vary slightly for some inputs, but it still works for our use case.

There is one other interesting scenario to consider.

What happens if duration is 0?:

schedule = expr.subs([
  (allocation, 100_000),
  (start, 50),
  (duration, 0)
])

$$\displaystyle \begin{cases} 0 & \text{for}\: round < 50 \\100000 & \text{otherwise} \end{cases}$$

It behaves like a regular timelock contract, releasing all the allocation at a specific round:

Feel free to copy my notebook on GitHub and experiment with different schedules.

Fixed vs. Dynamic Allocation

There's nothing stopping the contract creator (or anyone else) transferring additional Algos to the contract account after the initial seed payment.

So we have to make a decision in the contract design:

  • Vest only based on the initial seed amount, and allow additional funds to be recovered when the contract is closed.

  • Or vest based on the current balance of the contract account at any given round (factoring in the amount released so far).

I'm going to opt for the second approach because it opens up some more interesting possibilities.

You could, for example, keep topping up the balance of the contract prior to the start of the vesting period.

This is also the approach taken in the OpenZeppelin Solidity vesting contract:

Any token transferred to this contract will follow the vesting schedule as if they were locked from the beginning. Consequently, if the vesting has already started, any amount of tokens sent to this contract will (at least partly) be immediately releasable.

Source: VestingWallet

Writing the Contract

To start, we define an ARC-4 ABI method to create a new application:

class Vesting(ARC4Contract):
    """A linear vesting contract."""

    @arc4.abimethod(create="require")
    def new(self, beneficiary: Account, start: UInt64, duration: UInt64) -> None:
        """Creates a new application.

        Args:
            beneficiary (Account): The account that will receive the vested funds.
            start (UInt64): The round at which the vesting begins.
            duration (UInt64): The number of rounds the vesting is distributed over.
        """
        self.beneficiary = beneficiary
        self.start = start
        self.duration = duration
        self.released = UInt64(0)

And set each of the arguments passed to the method in the application's global state.

We also create a state variable called released, which will track the amount of funds withdrawn from the contract account.

Next, we define a method that can be called from on-chain or off-chain code, that calculates the vesting amount 'as at' a specific round:

@arc4.abimethod
def calculate_vesting(self, allocation: UInt64, start: UInt64, duration: UInt64, at: UInt64) -> UInt64:
    """Calculates the vesting amount at a specific round.

    Args:
        allocation (UInt64): The total funds allocated for vesting.
        start (UInt64): The round at which the vesting begins.
        duration (UInt64): The number of rounds the vesting is distributed over.
        at (UInt64): The round that the vesting amount will be calculated 'as at'.

    Returns:
        UInt64: The vesting amount.
    """
    if at < start:
        return UInt64(0)
    if at >= start + duration:
        return allocation
    return allocation * (at - start) // duration

I'm choosing to keep this as a pure function, instead of directly accessing the application state, so that it's easier to test.

Then we can have a separate method that releases funds:

  @arc4.abimethod
  def release_funds(self) -> None:
      """Transfers vested funds to the benefiary's account."""
      vested = self.calculate_vesting(
          allocation=Global.current_application_address.balance + self.released,
          start=self.start,
          duration=self.duration,
          at=Global.round,
      )
      releaseable = vested - self.released
      assert releaseable, "No funds to release at the current round"
      self.released += itxn.Payment(receiver=self.beneficiary, amount=releaseable, fee=0).submit().amount

In case you're wondering why the released amount gets added and then subtracted:

vesting(balance + released) - released != vesting(balance)

Putting It All Together

The code below is also available on GitHub:

from algopy import Account, ARC4Contract, Global, UInt64, arc4, itxn


class Vesting(ARC4Contract):
    """A linear vesting contract."""

    @arc4.abimethod(create="require")
    def new(self, beneficiary: Account, start: UInt64, duration: UInt64) -> None:
        """Creates a new application.

        Args:
            beneficiary (Account): The account that will receive the vested funds.
            start (UInt64): The round at which the vesting begins.
            duration (UInt64): The number of rounds the vesting is distributed over.
        """
        self.beneficiary = beneficiary
        self.start = start
        self.duration = duration
        self.released = UInt64(0)

    @arc4.abimethod
    def calculate_vesting(self, *, allocation: UInt64, start: UInt64, duration: UInt64, at: UInt64) -> UInt64:
        """Calculates the vesting amount at a specific round.

        Args:
            allocation (UInt64): The total funds allocated for vesting.
            start (UInt64): The round at which the vesting begins.
            duration (UInt64): The number of rounds the vesting is distributed over.
            at (UInt64): The round that the vesting amount will be calculated 'as at'.

        Returns:
            UInt64: The vesting amount.
        """
        if at < start:
            return UInt64(0)
        if at >= start + duration:
            return allocation
        return allocation * (at - start) // duration

    @arc4.abimethod
    def release_funds(self) -> None:
        """Transfers vested funds to the benefiary's account."""
        vested = self.calculate_vesting(
            allocation=Global.current_application_address.balance + self.released,
            start=self.start,
            duration=self.duration,
            at=Global.round,
        )
        releaseable = vested - self.released
        assert releaseable, "No funds to release at the current round"
        self.released += itxn.Payment(receiver=self.beneficiary, amount=releaseable, fee=0).submit().amount

Writing Tests

Although it's normally a good idea to avoid putting logic in tests, I'm doing so here to test the contract against the reference implementation in sympy.

The tests below are also not particularly thorough and could probably be improved with property testing.

Please only use this as a starting point for your own experiments!

The code below is also available on GitHub:

import pytest
from algokit_utils import (
    TransactionParameters,
    TransferParameters,
    get_localnet_default_account,
    transfer,
)
from algokit_utils.config import config
from algosdk.v2client.algod import AlgodClient
from algosdk.v2client.indexer import IndexerClient
from sympy import Piecewise, symbols

from smart_contracts.artifacts.linear_vesting.client import VestingClient


@pytest.fixture(scope="session")
def app_client(algod_client: AlgodClient, indexer_client: IndexerClient) -> VestingClient:
    account = get_localnet_default_account(algod_client)

    config.configure(
        debug=True,
        # trace_all=True,
    )

    client = VestingClient(
        algod_client,
        creator=account,
        indexer_client=indexer_client,
    )

    last_round = algod_client.status()["last-round"]

    client.create_new(
        beneficiary=account.address,
        start=last_round + 10,
        duration=100,
    )

    transfer(
        algod_client,
        TransferParameters(
            from_account=get_localnet_default_account(algod_client),
            to_address=client.app_address,
            micro_algos=1_000_000_000,
        ),
    )

    return client


def vesting_reference(p_allocation: int, p_start: int, p_duration: int, p_round: int) -> int:
    """Linear vesting reference implementation in sympy to test against.

    Args:
        p_allocation (int): The allocation amount to substitute in the expression.
        p_start (int): The start round to substitute in the expression.
        p_duration (int): The duration rounds to substitute in the expression.
        p_round (int): The round number to substitute in the expression.

    Returns:
        int: The integer vesting amount.
    """
    allocation, at, start, duration = symbols("allocation round start duration", integer=True, positive=True)
    expr = Piecewise(
        (0, at < start),
        (allocation, at >= start + duration),
        ((allocation * (at - start)) // duration, True),  # True = otherwise
    )
    return int(expr.subs([(allocation, p_allocation), (start, p_start), (duration, p_duration), (at, p_round)]))


@pytest.mark.parametrize(
    "p_allocation, p_start, p_duration", [(100, 0, 100), (1_000_000_000, 50_000, 1_000_000), (12, 0, 100)]
)
def test_calculate_vesting(app_client: VestingClient, p_allocation: int, p_start: int, p_duration: int) -> None:
    """Tests the calculate_vesting() method against a reference implementation in sympy."""

    for r in range(max(p_start - 1, 0), p_duration + 1, min(p_duration, 10_000)):
        ref_amount = vesting_reference(p_allocation, p_start, p_duration, r)

        amount = app_client.calculate_vesting(
            allocation=p_allocation, start=p_start, duration=p_duration, at=r
        ).return_value

        assert ref_amount == amount, "Vesting amount does not match sympy reference calculation"


def test_release_funds(app_client: VestingClient) -> None:
    """Tests the release_funds() method."""

    algod_client = app_client.algod_client
    account = get_localnet_default_account(algod_client)
    balance = lambda a: algod_client.account_info(a)["amount"]
    balance_before = balance(account.address)
    app_balance_before = balance(app_client.app_address)

    state = app_client.get_global_state()
    allocation = app_balance_before + state.released
    start = state.start
    duration = state.duration

    sp = algod_client.suggested_params()
    sp.fee = 2_000
    sp.flat_fee = True
    r = app_client.release_funds(transaction_parameters=TransactionParameters(suggested_params=sp)).confirmed_round

    ref_amount = vesting_reference(allocation, start, duration, r)

    assert balance(account.address) == balance_before + ref_amount - state.released - 2_000