Building a Personal NFT Marketplace on Algorand

Introduction

In this post, we're going to look at writing a simple smart contract in Algorand Python that allows a creator to list and sell their NFTs.

The creator will be able to transfer assets to the application and set a sale price for each item using box storage.

A buyer will be able to purchase an NFT if they make a payment to the creator for an amount greater than or equal to the listed price.

💡
This contract is only for demonstration purposes. It has not been audited.

A Quick Note on Terminology

In Algorand Python, the term 'creator' refers to the application creator (the account that deploys the smart contract on the blockchain).

In the context of the contract we're writing, the application creator is also the NFT creator, who is listing the assets for sale.

The Contract Outline

Let's start by defining the contract class and some initial methods:

class PersonalMarketplace(ARC4Contract):
    """A simple contract for selling NFTs."""

    @arc4.abimethod
    def creator(self) -> arc4.Address:
        """A public method that returns the creator's address encoded as an ARC-4 ABI type.

        Returns:
            arc4.Address: The creator's address.
        """
        return arc4.Address(Global.creator_address)

    @subroutine
    def creator_only(self) -> None:
        """Causes the application call to fail if the transaction sender is not the creator."""
        assert Txn.sender == Global.creator_address, "Only the creator can call this method"

The public creator() method makes it easier for buyers to construct transactions off chain, or from another smart contract.

Most of the other methods in the contract are going to be restricted so that only the creator can call them.

The creator_only() method acts as a guard that enforces this condition.

Opting In

In order for the contract to receive a new asset, it first needs to opt in to receive it.

Only the creator should be allowed to call this method:

@arc4.abimethod
def opt_in(self, nft: Asset) -> None:
    """Opts the contract into an asset.

    Args:
        nft (Asset): The asset to opt in to.
    """
    self.creator_only()
    itxn.AssetTransfer(
        xfer_asset=nft,
        asset_receiver=Global.current_application_address,
        asset_amount=0,
        fee=0,
    ).submit()

Listing an NFT for Sale

To list an NFT for sale, the creator transfers an asset to the application account.

The creator also provides the minimum price the asset can be sold for, which gets stored in a box:

@subroutine
def box_key(self, nft: Asset) -> Bytes:
    """Returns the box storage key for an NFT.

    Args:
        nft (Asset): The NFT.

    Returns:
        Bytes: The key for the NFT.
    """
    return op.itob(nft.id)

@arc4.abimethod
def list_nft(self, axfer: gtxn.AssetTransferTransaction, price: arc4.UInt64) -> None:
    """Lists an NFT for sale.

    Args:
        axfer (gtxn.AssetTransferTransaction): The transaction transferring the asset for sale to the contract.
        price (arc4.UInt64): The minimum price the asset can be sold for.
    """
    self.creator_only()
    assert (
        axfer.asset_receiver == Global.current_application_address
    ), "Asset receiver must be the application address"
    assert axfer.asset_amount == 1, "Asset amount must be 1"

    # Store sale price in box storage
    op.Box.put(self.box_key(axfer.xfer_asset), price.bytes)

Let's also add a method that anyone can call to retrieve the sale price for an NFT:

@arc4.abimethod
def price(self, nft: Asset) -> UInt64:
    """Returns the minimum sale price for an NFT.

    Args:
        nft (Asset): The NFT.

    Returns:
        UInt64: The minimum sale price in MicroAlgos.
    """
    value, exists = op.Box.get(self.box_key(nft))
    assert exists, "Price not found"
    return op.btoi(value)

Purchasing an NFT

To purchase an NFT, the buyer must pay the creator an amount greater than or equal to the listed sale price for the asset.

This is where atomic transactions come into play.

The buyer submits a group of transactions, that includes both an application call to purchase an asset, and a direct payment to the creator.

The application validates the payment and only releases an asset if the conditions are met.

The group of transactions are guaranteed to either all fail or all succeed.

@arc4.abimethod
def purchase_nft(self, nft: Asset, payment: gtxn.PaymentTransaction) -> None:
    """Allows a user to purchase an NFT.

    Args:
        nft (Asset): The NFT being purchased.
        payment (gtxn.PaymentTransaction): The payment to the creator.
    """
    assert nft.balance(Global.current_application_address), "NFT not available for purchase"
    assert payment.sender.is_opted_in(nft), "Sender must opt in to receive NFT"
    assert payment.receiver == Global.creator_address, "Payment receiver must be the creator"
    assert payment.amount >= (listed_price := self.price(nft)), "Payment amount must be >= NFT price"

    itxn.AssetTransfer(
        xfer_asset=nft,
        asset_receiver=payment.sender,
        asset_amount=1,
        fee=0,
    ).submit()

The first steps validate that the NFT is available for sale, and that the payment transaction in the group is valid.

If those checks pass, the contract submits an inner transaction to transfer the desired NFT to the buyer.

We could add an additional statement to make sure the transaction sender is the same as the payment sender:

assert payment.sender == Txn.sender, "Payment sender must match transaction sender"

But for flexibility I will leave this out.

Emitting Events

Algorand smart contracts can write logs to the blockchain.

ARC-28 provides a way to write structured log data, using ABI encoding.

We can use it to log the details of each purchase, which can be accessed later using the indexer.

Let's define the structure of the sale event:

class SaleEvent(arc4.Struct):
    asset_id: arc4.UInt64
    listed_price: arc4.UInt64
    amount_paid: arc4.UInt64
    buyer: arc4.Address
    processed_round: arc4.UInt64
    processed_timestamp: arc4.UInt64

And update the purchase_nft() method to emit an event:

@arc4.abimethod
def purchase_nft(self, nft: Asset, payment: gtxn.PaymentTransaction) -> None:
    """Allows a user to purchase an NFT.

    Args:
        nft (Asset): The NFT being purchased.
        payment (gtxn.PaymentTransaction): The payment to the creator.
    """
    assert nft.balance(Global.current_application_address), "NFT not available for purchase"
    assert payment.sender.is_opted_in(nft), "Sender must opt in to receive NFT"
    assert payment.receiver == Global.creator_address, "Payment receiver must be the creator"
    assert payment.amount >= (listed_price := self.price(nft)), "Payment amount must be >= NFT price"

    itxn.AssetTransfer(
        xfer_asset=nft,
        asset_receiver=payment.sender,
        asset_amount=1,
        fee=0,
    ).submit()

    # Log sale event
    arc4.emit(
        SaleEvent(
            arc4.UInt64(nft.id),  # asset_id
            arc4.UInt64(listed_price),  # listed_price
            arc4.UInt64(payment.amount),  # amount_paid
            arc4.Address(payment.sender),  # buyer
            arc4.UInt64(Global.round),  # processed_round
            arc4.UInt64(Global.latest_timestamp),  # processed_timestamp
        )
    )

    # Remove listing
    _deleted = op.Box.delete(self.box_key(nft))

I've added a final step to delete the sale price from box storage as well.

If the creator wants to delist an NFT, they could call this method (making a payment to themselves) and have the asset returned.

Parsing Event Logs (Off Chain)

This was a bit trickier than I was expecting.

I'm not sure if ARC-28 is properly supported yet in the Python algosdk or algokit_utils libraries.

What I decided to do is create a Pydantic model to represent the event log:

from datetime import datetime
from pydantic import BaseModel

class SaleEvent(BaseModel):
    asset_id: int
    listed_price: int
    amount_paid: int
    buyer: str
    processed_round: int
    processed_timestamp: datetime

For this application, we can get the logs by calling the indexer with:

logs = [
    item["logs"][0]
    for item in indexer_client.application_logs(app_client.app_id, min_round=response.confirmed_round)["log-data"]
    if item["logs"]
]

The result is a list of strings, which are base64 encoded.

ARC-28 events are prefixed with the first 4 bytes of the hash of the ABI signature.

I found the signature for the SaleEvent in the contract's approval TEAL:

method "SaleEvent(uint64,uint64,uint64,address,uint64,uint64)"

So we can find the sale events by filtering the list of logs to those that have the relevant prefix:

def _is_sale_event(log: str):
    """Helper function to check if a log is a sale event."""
    sale_event_signature = "SaleEvent(uint64,uint64,uint64,address,uint64,uint64)"
    prefix = hashlib.new("sha512_256", sale_event_signature.encode()).digest()[:4]
    return base64.b64decode(log)[:4] == prefix

sale_events = [log for log in logs if _is_sale_event(log)]
assert sale_events, "No sale events found in application logs"

Now to actually decode the log, I used the ABI tuple type from algosdk:

last_sale_event = sale_events[-1]
codec = abi.TupleType.from_string("(uint64,uint64,uint64,address,uint64,uint64)")
decoded = codec.decode(base64.b64decode(last_sale_event)[4:])

Which returned a list of values that can be passed into the Pydantic model:

>>> event = SaleEvent(**dict(zip(SaleEvent.__annotations__.keys(), decoded)))
>>> event.model_dump()
{'asset_id': 1295,
 'listed_price': 7500000000,
 'amount_paid': 7500000000,
 'buyer': 'WPVRVAGPXGWYE4N75FNOEZOZ3VAS5TWVCRZS5QZ6HPQDRAPDKZIMAGJP4Q',
 'processed_round': 210,
 'processed_timestamp': datetime.datetime(2024, 4, 21, 4, 47, 50, tzinfo=TzInfo(UTC))}

The Complete Contract Code

The code below is also available on GitHub.

from algopy import (
    ARC4Contract,
    Asset,
    Bytes,
    Global,
    Txn,
    UInt64,
    arc4,
    gtxn,
    itxn,
    op,
    subroutine,
)


class SaleEvent(arc4.Struct):
    asset_id: arc4.UInt64
    listed_price: arc4.UInt64
    amount_paid: arc4.UInt64
    buyer: arc4.Address
    processed_round: arc4.UInt64
    processed_timestamp: arc4.UInt64


class PersonalMarketplace(ARC4Contract):
    """A simple contract for selling NFTs."""

    @arc4.abimethod
    def creator(self) -> arc4.Address:
        """A public method that returns the creator's address encoded as an ARC-4 ABI type.

        Returns:
            arc4.Address: The creator's address.
        """
        return arc4.Address(Global.creator_address)

    @subroutine
    def creator_only(self) -> None:
        """Causes the contract to fail if the transaction sender is not the creator."""
        assert Txn.sender == Global.creator_address, "Only the creator can call this method"

    @arc4.abimethod
    def opt_in(self, nft: Asset) -> None:
        """Opts the contract into an asset.

        Args:
            nft (Asset): The asset to opt in to.
        """
        self.creator_only()
        itxn.AssetTransfer(
            xfer_asset=nft,
            asset_receiver=Global.current_application_address,
            asset_amount=0,
            fee=0,
        ).submit()

    @subroutine
    def box_key(self, nft: Asset) -> Bytes:
        """Returns the box storage key for an NFT.

        Args:
            nft (Asset): The NFT.

        Returns:
            Bytes: The key for the NFT.
        """
        return op.itob(nft.id)

    @arc4.abimethod
    def list_nft(self, axfer: gtxn.AssetTransferTransaction, price: arc4.UInt64) -> None:
        """Lists an NFT for sale.

        Args:
            axfer (gtxn.AssetTransferTransaction): The transaction transferring the asset for sale to the contract.
            price (arc4.UInt64): The minimum price the asset can be sold for.
        """
        self.creator_only()
        assert (
            axfer.asset_receiver == Global.current_application_address
        ), "Asset receiver must be the application address"
        assert axfer.asset_amount == 1, "Asset amount must be 1"

        # Store sale price in box storage
        op.Box.put(self.box_key(axfer.xfer_asset), price.bytes)

    @arc4.abimethod
    def price(self, nft: Asset) -> UInt64:
        """Returns the minimum sale price for an NFT.

        Args:
            nft (Asset): The NFT.

        Returns:
            UInt64: The minimum sale price in MicroAlgos.
        """
        value, exists = op.Box.get(self.box_key(nft))
        assert exists, "Price not found"
        return op.btoi(value)

    @arc4.abimethod
    def purchase_nft(self, nft: Asset, payment: gtxn.PaymentTransaction) -> None:
        """Allows a user to purchase an NFT.

        Args:
            nft (Asset): The NFT being purchased.
            payment (gtxn.PaymentTransaction): The payment to the creator.
        """
        assert nft.balance(Global.current_application_address), "NFT not available for purchase"
        assert payment.sender.is_opted_in(nft), "Sender must opt in to receive NFT"
        assert payment.receiver == Global.creator_address, "Payment receiver must be the creator"
        assert payment.amount >= (listed_price := self.price(nft)), "Payment amount must be >= NFT price"

        itxn.AssetTransfer(
            xfer_asset=nft,
            asset_receiver=payment.sender,
            asset_amount=1,
            fee=0,
        ).submit()

        # Log sale event
        arc4.emit(
            SaleEvent(
                arc4.UInt64(nft.id),  # asset_id
                arc4.UInt64(listed_price),  # listed_price
                arc4.UInt64(payment.amount),  # amount_paid
                arc4.Address(payment.sender),  # buyer
                arc4.UInt64(Global.round),  # processed_round
                arc4.UInt64(Global.latest_timestamp),  # processed_timestamp
            )
        )

        # Remove listing
        _deleted = op.Box.delete(self.box_key(nft))

Writing Tests

There should be enough information in the tests to understand how to interact with the application from off-chain code and integrate it with a front end.

As always, please only use these tests as a starting point for your own work.

They are not exhaustive.

The code below is also available on GitHub.

import base64
import hashlib
import time
from datetime import datetime

import algokit_utils
import pytest
from algokit_utils import (
    Account,
    EnsureBalanceParameters,
    TransactionParameters,
    TransferParameters,
    ensure_funded,
    get_account,
    get_indexer_client,
    opt_in,
    transfer,
)
from algokit_utils.config import config
from algosdk import abi
from algosdk.atomic_transaction_composer import (
    AccountTransactionSigner,
    TransactionWithSigner,
)
from algosdk.transaction import AssetCreateTxn, AssetTransferTxn, PaymentTxn, wait_for_confirmation
from algosdk.util import algos_to_microalgos
from algosdk.v2client.algod import AlgodClient
from algosdk.v2client.indexer import IndexerClient
from pydantic import BaseModel

from smart_contracts.artifacts.personal_marketplace.client import PersonalMarketplaceClient


class SaleEvent(BaseModel):
    asset_id: int
    listed_price: int
    amount_paid: int
    buyer: str
    processed_round: int
    processed_timestamp: datetime


@pytest.fixture(scope="session")
def app_client(account: Account, algod_client: AlgodClient, indexer_client: IndexerClient) -> PersonalMarketplaceClient:
    config.configure(
        debug=True,
        # trace_all=True,
    )

    client = PersonalMarketplaceClient(
        algod_client,
        creator=account,
        indexer_client=indexer_client,
    )

    client.deploy(
        on_schema_break=algokit_utils.OnSchemaBreak.AppendApp,
        on_update=algokit_utils.OnUpdate.AppendApp,
    )

    transfer(
        algod_client,
        TransferParameters(
            from_account=account,
            to_address=client.app_address,
            micro_algos=algos_to_microalgos(7_500),
        ),
    )

    return client


def _list_nft(account: Account, app_client: PersonalMarketplaceClient) -> None:
    """Helper function to list an NFT on the marketplace."""

    sp = app_client.algod_client.suggested_params()
    sp.fee = 2_000
    sp.flat_fee = True

    txn = AssetCreateTxn(
        sender=account.address,
        sp=sp,
        total=1,
        decimals=0,
        default_frozen=False,
        unit_name="NFT",
        asset_name="NFT",
    )
    signed_txn = txn.sign(account.private_key)
    txid = app_client.algod_client.send_transaction(signed_txn)
    txn_result = wait_for_confirmation(app_client.algod_client, txid, 4)
    asset_id = txn_result["asset-index"]

    app_client.opt_in(nft=asset_id, transaction_parameters=TransactionParameters(suggested_params=sp))

    axfer = AssetTransferTxn(
        sender=account.address,
        sp=sp,
        receiver=app_client.app_address,
        amt=1,
        index=asset_id,
    )

    app_client.list_nft(
        axfer=TransactionWithSigner(axfer, AccountTransactionSigner(account.private_key)),
        price=algos_to_microalgos(7_500),
        transaction_parameters=TransactionParameters(suggested_params=sp, boxes=[(0, asset_id)]),
    )

    return asset_id


def _is_sale_event(log: str):
    """Helper function to check if a log is a sale event."""
    sale_event_signature = "SaleEvent(uint64,uint64,uint64,address,uint64,uint64)"
    prefix = hashlib.new("sha512_256", sale_event_signature.encode()).digest()[:4]
    return base64.b64decode(log)[:4] == prefix


def test_list_nft(account: Account, app_client: PersonalMarketplaceClient) -> None:
    """Tests the convert_algo_to_utxo() method."""

    asset_id = _list_nft(account, app_client)

    # Check app account balance of NFT
    assert (
        app_client.algod_client.account_asset_info(app_client.app_address, asset_id)["asset-holding"]["amount"] == 1
    ), "NFT not transferred to app address"

    # Check box value (price) via Algod
    assert int.from_bytes(
        base64.b64decode(
            app_client.algod_client.application_box_by_name(app_client.app_id, int.to_bytes(asset_id, length=8))[
                "value"
            ]
        )
    ) == algos_to_microalgos(7_500), "NFT not listed"

    # Check box value (price) via app call
    assert app_client.price(
        nft=asset_id,
        transaction_parameters=TransactionParameters(boxes=[(0, asset_id)]),
    ).return_value == algos_to_microalgos(7_500), "Incorrect price returned from price() method"


def test_purchase_nft(account: Account, app_client: PersonalMarketplaceClient) -> None:
    """Tests the purchase_nft() method."""

    asset_id = _list_nft(account, app_client)

    # Generate new account to simulate buyer
    buyer = get_account(app_client.algod_client, "buyer")
    # Opt buyer in to receiver asset
    opt_in(app_client.algod_client, buyer, asset_ids=[asset_id])

    creator = app_client.creator().return_value
    assert creator == account.address, "Incorrect creator account"

    sale_price = app_client.price(
        nft=asset_id, transaction_parameters=TransactionParameters(boxes=[(0, asset_id)])
    ).return_value

    ptxn = PaymentTxn(
        sender=buyer.address,
        sp=app_client.algod_client.suggested_params(),
        receiver=account.address,
        amt=sale_price,
    )

    parameters = EnsureBalanceParameters(
        account_to_fund=buyer,
        min_spending_balance_micro_algos=sale_price + 100_000 + 2_000,  # price + MBR for new asset + fee
    )

    ensure_funded(app_client.algod_client, parameters)

    sp = app_client.algod_client.suggested_params()
    sp.fee = 2_000
    sp.flat_fee = True

    response = app_client.purchase_nft(
        nft=asset_id,
        payment=TransactionWithSigner(ptxn, AccountTransactionSigner(buyer.private_key)),
        transaction_parameters=TransactionParameters(
            suggested_params=sp, boxes=[(0, asset_id)], foreign_assets=[asset_id], accounts=[buyer.address]
        ),
    )

    assert (
        app_client.algod_client.account_asset_info(buyer.address, asset_id)["asset-holding"]["amount"] == 1
    ), "NFT not transferred to buyer"

    indexer_client = get_indexer_client()

    time.sleep(5)  # Wait for indexer to catch up
    logs = [
        item["logs"][0]
        for item in indexer_client.application_logs(app_client.app_id, min_round=response.confirmed_round)["log-data"]
        if item["logs"]
    ]

    sale_events = [log for log in logs if _is_sale_event(log)]
    assert sale_events, "No sale events found in application logs"

    last_sale_event = sale_events[-1]
    codec = abi.TupleType.from_string("(uint64,uint64,uint64,address,uint64,uint64)")
    decoded = codec.decode(base64.b64decode(last_sale_event)[4:])
    event = SaleEvent(**dict(zip(SaleEvent.__annotations__.keys(), decoded)))

    assert event.asset_id == asset_id, "Incorrect asset ID in sale event"
    assert event.listed_price == algos_to_microalgos(7_500), "Incorrect listed price in sale event"
    assert event.amount_paid == sale_price, "Incorrect amount paid in sale event"
    assert event.buyer == buyer.address, "Incorrect buyer in sale event"
    assert event.processed_round == response.confirmed_round, "Incorrect processed round in sale event"