This is a simple stream setup that uses Memgraph to ingest real-time data from a simulated online store. Data is streamed via Redpanda and Pulsar.
You will need:
- Docker
- Docker Compose (included with Docker Desktop on Windows and macOS)
1. First, remove possibly running containers:
docker-compose rm -fs
2. Build all the needed images:
docker-compose build
3. Start the Redpanda and Apache Pulsar services:
docker-compose up -d core
4. Start the data stream:
docker-compose up stream
5. Start Memgraph:
docker-compose up memgraph-mage
1. First, we will create a stream for consuming product views:
CREATE PULSAR STREAM views
TOPICS views
TRANSFORM ecommerce.view
SERVICE_URL "pulsar://pulsar:6650";
2. Another stream is needed to consume product review:
CREATE KAFKA STREAM ratings
TOPICS ratings
TRANSFORM ecommerce.rating
BOOTSTRAP_SERVERS "redpanda:29092";
3. Now, we can start the streams:
START ALL STREAMS;
4. Check if the streams are running correctly:
SHOW STREAMS;
- Return 10 users:
MATCH (u:User)
RETURN u.name
ORDER BY u.name
LIMIT 10;
- Return 10 products:
MATCH (p:Product)
RETURN p.name
ORDER BY p.name
LIMIT 10;
- Return all the phones that the user named April Ludgate viewed:
MATCH (u:User)-[:VIEWED]-(p)
WHERE u.name = "April Ludgate"
RETURN p.name;
- Return all the phones that the user named Leslie Knope rated:
MATCH (u:User)-[:RATED]-(p)
WHERE u.name = "Leslie Knope"
RETURN p.name;
- Get the average rating for each phone the user Hubert J. Farnsworth viewed:
MATCH (u:User)-[:VIEWED]->(p)
OPTIONAL MATCH (p)<-[r:RATED]-()
WHERE u.name = "Hubert J. Farnsworth"
RETURN p.name AS viewed, avg(r.rating) as rating
ORDER BY rating ASC;
- Only consider ratings that happened after June 2020 in the last query:
MATCH (u:User)-[:VIEWED]->(p)
MATCH (p)<-[r:RATED]-()
WHERE u.name = "Hubert J. Farnsworth" AND r.timestamp > LocalDateTime("2020-06-01T00:00")
RETURN p.name AS viewed, avg(r.rating) as rating
ORDER BY rating ASC;
You can generate a product recommendation by running:
MATCH (u:User {id: "1"})-[r:RATED]-(p:Product)
-[other_r:RATED]-(other:User)
WITH other.id AS other_id,
avg(r.rating-other_r.rating) AS similarity,
count(*) AS similar_user_count,
u.id AS user
ORDER BY similarity
LIMIT 10
WITH collect(other_id) AS similar_user_set, user
MATCH (some_product: Product)-[fellow_rate:RATED]-(fellow_user:User)
WHERE fellow_user.id IN similar_user_set
WITH some_product, avg(fellow_rate.rating) AS prediction_score, user
RETURN some_product.name AS Name, prediction_score, user
ORDER BY prediction_score DESC;