MarketplaceOnStatefun is the Statefun port of Online Marketplace, the application prescribed as part of a microservice-based benchmark of same name being designed by the Data Management Systems (DMS) group at the University of Copenhagen. Further details about the benchmark can be found in the benchmark driver repository.
- Docker
- Docker Compose
- JDK 8 (if you want to modify the source code)
- Curl (if you want to play with the APIs)
Statefun provides a runtime to program functions that necessitate managing state as part of their execution. It is built on top of Apache Flink and inherits all the benefits brought about by the stream processing engine. We highly recommend starting from the Statefun documentation and project examples, that can be found in the Stateful Functions Playground repository.
This port runs based on the project Statefun Playground. This decision is made to facilitate the packaging of dependencies, the deployment scheme, and the submission of workload and collection of performance metrics through HTTP endpoints.
If you are interested on deploying MarketplaceOnStatefun for reproducing an experiment, please refer to the next block.
To get up MarketplaceOnStatefun up and running, run the following commands in the project's root folder:
docker-compose build
docker-compose up
The original statefun-playground Docker image runs with default Flink parameters, which can lead to performance issues. To circumvent this shortcoming, we suggest advanced users two options:
(a) Modify Flink configuration and generate a custom image from the source code
Flink configuration can be modified in the method createDefaultLocalEnvironmentFlinkConfiguration found in the class LocalEnvironmentEntryPoint
An example configuration is provided below.
final Configuration flinkConfiguration = new Configuration();
flinkConfiguration.set(StateBackendOptions.STATE_BACKEND, "hashmap");
flinkConfiguration.set(StateBackendOptions.LATENCY_TRACK_ENABLED, false);
// task slots >= parallelism
ConfigOption<Integer> NUM_TASK_SLOTS = ConfigOptions.key("taskmanager.numberOfTaskSlots").intType().defaultValue(1);
flinkConfiguration.set(NUM_TASK_SLOTS, 6);
ConfigOption<Integer> PAR_DEFAULT = ConfigOptions.key("parallelism.default").intType().defaultValue(1);
flinkConfiguration.set(PAR_DEFAULT, 6);
ConfigOption<Integer> ASYNC_MAX_DEFAULT = ConfigOptions.key("statefun.async.max-per-task").intType().defaultValue(1024);
flinkConfiguration.set(ASYNC_MAX_DEFAULT, 16000);
return flinkConfiguration;
(b) Overwrite the Flink parameters.
By opting for (a) or (b), then proceed as follows:
In the flink-statefun-playground root's folder, run:
docker build -t flink-statefun-playground-custom .
Then go to MarketplaceOnStatefun root's folder and run:
docker-compose -f docker-compose-custom.yml build
docker-compose -f docker-compose-custom.yml up
After these commands, the application is probably up and running.
The file app.properties
defines entries that refer to configuration parameters. These are loaded dynamically on application startup. The parameters are:
logging=[true/false]
: Defines whether logging historical records to PostgreSQL is enabled.
connection_string
, user
, and password
: PostgreSQL connection parameters.
num_shipments=[1-N]
: Defines the number of shipment functions.
Let's start adding a product to the Marketplace
curl -X PUT -H "Content-Type: application/vnd.marketplace/UpsertProduct" -d '{"seller_id": "1", "product_id": "1", "name" : "productTest", "sku" : "sku", "category" : "categoryTest", "description": "descriptionTest", "price" : 10, "freight_value" : 0, "version": "0"}' localhost:8090/marketplace/product/1-1
Let's send a GET request to verify whether the function have successfully stored the state
curl -X PUT -H "Content-Type: application/vnd.marketplace/GetProduct" -d '{}' localhost:8090/marketplace/product/1-1
Now let's get the results of the requests sent so far:
curl -X GET localhost:8091/receipts
If everything worked out, you should see an output like it follows.
Product{product_id=1, seller_id=1, name='productTest', sku='sku', category='categoryTest', description='descriptionTest', price=10.0, freight_value=0.0, status='approved', version='0', createdAt=2023-10-24T14:46:57.656, updatedAt=2023-10-24T14:46:57.656}
There are two ways we can update a product: updating the price or overwriting a product.
To submit a price update, a user has to send the following request
curl -X PUT -H "Content-Type: application/vnd.marketplace/UpdatePrice" -d '{ "sellerId" : 1, "productId" : 1, "price" : 100, "instanceId" : 1 }' localhost:8090/marketplace/product/1-1
That will lead to the following result
{ "tid" : "1", "type" : "PRICE_UPDATE", "actorId" : 1, "status" : "SUCCESS", "source" : "product"}
Querying the product again, we can see the updated price
Product{product_id=1, seller_id=1, name='productTest', sku='sku', category='categoryTest', description='descriptionTest', price=100.0, freight_value=0.0, status='approved', version='0', createdAt=2023-10-25T14:45:22.388, updatedAt=2023-10-25T14:45:40.637}
To overwrite a product, the client just needs to send a UpsertProduct payload like shown before.
Let's add now the corresponding stock for this product and repeat the steps above, now for stock
curl -X PUT -H "Content-Type: application/vnd.marketplace/SetStockItem" -d '{"seller_id": "1", "product_id": "1", "qty_available" : 10, "qty_reserved" : 0, "order_count" : 0, "ytd": 0, "data" : "", "version": "0"}' localhost:8090/marketplace/stock/1-1
curl -X PUT -H "Content-Type: application/vnd.marketplace/GetStockItem" -d '{}' localhost:8090/marketplace/stock/1-1
You should get a result like below
StockItem{product_id=1, seller_id=1, qty_available=10, qty_reserved=0, order_count=0, ytd=0, data='', version='0', createdAt=2023-10-24T14:43:19.741, updatedAt=2023-10-24T14:43:19.741}
Now let's move on to an actual customer interaction by adding an item to a cart and checking out afterwards.
First, let's add a customer to our marketplace
curl -X PUT -H "Content-Type: application/vnd.marketplace/SetCustomer" -d '{"id": 1, "first_name": "firstNameTest", "last_name" : "lastNameTest", "address": "addressTest", "complement": "complementTest", "birth_date": "birthDateTest", "zip_code": "zipCodeTest", "city": "cityTest", "state": "stateTest", "card_number": "cardNumberTest", "card_security_number": "cardSecNumberTest", "card_expiration": "cardExpirationTest", "card_holder_name": "cardHolderNameTest", "card_type": "cardTypeTest", "data": ""}' localhost:8090/marketplace/customer/1
curl -X PUT -H "Content-Type: application/vnd.marketplace/GetCustomer" -d '{}' localhost:8090/marketplace/customer/1
After submitting the above commands, you may get a response like from receipts:
Customer{id=1, firstName='firstNameTest', lastName='lastNameTest', address='addressTest', complement='complementTest', birthDate='birthDateTest', zipCode='zipCodeTest', city='cityTest', state='stateTest', cardNumber='cardNumberTest', cardSecurityNumber='cardSecNumberTest', cardExpiration='cardExpirationTest', cardHolderName='cardHolderNameTest', cardType='cardTypeTest', data='', successPaymentCount=0, failedPaymentCount=0, deliveryCount=0}
Next, let's make sure we add a cart item that actually exists in the stock
curl -X PUT -H "Content-Type: application/vnd.marketplace/AddCartItem" -d '{"SellerId": "1", "ProductId": "1", "ProductName" : "test", "UnitPrice" : 10, "FreightValue" : 0, "Quantity": 3, "Voucher" : 0, "Version": "0"}' localhost:8090/marketplace/cart/1
Again, to make sure our cart has successfully accounted for the cart add item, we can verify by sending a GET request to the cart function
curl -X PUT -H "Content-Type: application/vnd.marketplace/GetCart" -d '{}' localhost:8090/marketplace/cart/1
That will give us an output like below (through the GET receipts request)
CartState{status=OPEN, items=CartItem{sellerId=1, productId=1, productName='test', unitPrice=10.0, freightValue=0.0, quantity=3, voucher=0.0, version='0'}, createdAt=2023-10-24T14:57:25.753, updatedAt=2023-10-24T14:57:25.753}
Let's checkout the customer cart with the following command:
curl -X PUT -H "Content-Type: application/vnd.marketplace/CustomerCheckout" -d '{"CustomerId": 1, "FirstName": "customerTest", "LastName" : "test", "Street" : "test", "Complement" : "test", "City" : "test", "State" : "test", "ZipCode" : "test", "PaymentType" : "BOLETO", "CardNumber" : "test", "CardNumber" : "test", "CardHolderName" : "test", "CardExpiration" : "test", "CardSecurityNumber" : "test", "CardBrand" : "test", "Installments" : 1, "instanceId" : "1" }' localhost:8090/marketplace/cart/1
The result of the checkout can be verified in the receipts' endpoint and it will be something like below
{ "tid" : "1", "type" : "CUSTOMER_SESSION", "actorId" : 1, "status" : "SUCCESS", "source" : "shipment"}
Customers can query their submitted order and understand their status
curl -X PUT -H "Content-Type: application/vnd.marketplace/GetOrders" -d '{}' localhost:8090/marketplace/order/1
That will result in the following payload
Order{id=3, customerId=1, status=READY_FOR_SHIPMENT, invoiceNumber='1-2023-10-25T14:34:40.595-3', purchaseTimestamp=2023-10-25T14:34:40.530, created_at=2023-10-25T14:34:40.595, updated_at=2023-10-25T14:34:40.778, paymentDate=null, delivered_carrier_date=null, delivered_customer_date=null, countItems=1, totalAmount=30.0, totalFreight=0.0, totalIncentive=0.0, totalInvoice=30.0, totalItems=30.0, data=''}
In a marketplace, sellers play a central role by offering varied products to customers. We can check how well a seller is doing by getting an overview of sales. *The results will be provided in binary format that requires being properly deserialized by the client.
curl -X PUT -H "Content-Type: application/vnd.marketplace/QueryDashboard" -d '{"tid" : 1}' localhost:8090/marketplace/seller/1
To verify the pending shipments, that is, the orders that have not been delivered yet, you can query the shipment function
curl -X PUT -H "Content-Type: application/vnd.marketplace/GetShipments" -d '{"customerId" : 1}' localhost:8090/marketplace/shipment/0
Resulting in something like below
Shipment{shipmentId=1, orderId=1, customerId=1, packageCount=1, totalFreight=0.0, firstName='customerTest', lastName='test', street='test', zipCode='test', status=APPROVED, city='test', state='test', requestDate=2023-10-25T10:14:09.729}
It may be the case that the delivery company informs that some of the packages are delivered. They do it through the following API:
curl -X PUT -H "Content-Type: application/vnd.marketplace/UpdateShipment" -d '{"tid" : 1}' localhost:8090/marketplace/shipmentProxy/1
That will result in the output like it follows
{ "tid" : "1", "type" : "UPDATE_DELIVERY", "actorId" : 1, "status" : "SUCCESS", "source" : "shipment"}
After we update the open orders to delivered, we can see results are no longer shown in the seller dashboard
You can modify the source code to add new functionality. For instance, you can try adding stock to some item.
After modifying the code, you can perform a hot deploy by running the following command (make sure statefun-playground container has not been stopped):
docker-compose up -d --build marketplace
There is a suite of tests available for checking some Online Marketplace benchmark constraints. The tests can be found in the following path: link