Streaming recommendation is one of the most challenging topics in the field of recommender systems. The task requires recommendation engine to incrementally and promptly update recommendation model as new user-item interaction comes in to data streams (e.g., click, purchase, watch). I previously studied such incremental recommendation techniques, and eventually published a Python library named FluRS to make their implementation and evaluation easier.
As I described in my research paper a couple of years ago, incremental recommender systems run in three steps:
- System recommends top-N items to a user by using a production recommendation model
- User may interact with one or more recommended items
- Recommendation model is incrementally updated based on the observed user-item interactions
In order to provide an implementation idea of the above recommender systems, this article examines Faust, a handy stream and event processing engine for Python, in combination with FluRS. Faust allows our Python code to easily consume data streams and do something for incoming events.
pip install faust
Updating FluRS recommender from a Faust processor
Assume that a dummy Kafka topic
flurs-events continuously receives MovieLens rating events represented by pairs of
<user, item, rating, timestamp>. In case that those events are JSON-serialized, a Faust event processor can be defined as:
import json import faust app = faust.App( 'flurs-recommender', broker='kafka://localhost:9092', value_serializer='raw', ) topic = app.topic('flurs-events', value_type=bytes) async def process(stream): async for obj in stream: event = json.loads(obj) # do something awesome
In Faust, the
@app.agent(topic) decorator enables creating a processor, and what we have to do is just writing a Python function that does something for every single events from a stream. Since our application is a streaming recommender system, the function should update recommendation model against an observed event:
async def process(stream): async for obj in stream: event = json.loads(obj) # focus only on positive feedback (i.e., rating is greater than 3) if event['rating'] < 3: continue # user (item) index = event['user'] (event['item']) - 1 user, item = User(event['user'] - 1), Item(event['item'] - 1) recommender.update(Event(user, item))
It should be noticed that this processor corresponds to Step 3 in the above figure of streaming recommendation.
On the latest version of FluRS,
Event classes wrap user/item instances with their indices and features. For instance,
recommender can be a matrix factorization model that is preliminary initialized as follows:
from flurs.data.entity import User, Item, Event from flurs.recommender import MFRecommender recommender = MFRecommender(k=40) recommender.initialize() n_user, n_item = 943, 1682 for u in range(1, n_user + 1): recommender.register(User(u - 1)) for i in range(1, n_item + 1): recommender.register(Item(i - 1))
The code deterministically registers all possible instances to the recommender beforehand, since we already know total number of users and items (i.e., movies) in the public dataset.
Running the streaming recommendation engine
Eventually, our stream recommender,
recommender.py, can be implemented and executed as:
from flurs.data.entity import User, Item, Event from flurs.recommender import MFRecommender import json import faust app = faust.App( 'flurs-recommender', broker='kafka://localhost:9092', value_serializer='raw', ) topic = app.topic('flurs-events', value_type=bytes) recommender = MFRecommender(k=40) recommender.initialize() n_user, n_item = 943, 1682 for u in range(1, n_user + 1): recommender.register(User(u - 1)) for i in range(1, n_item + 1): recommender.register(Item(i - 1)) async def process(stream): async for obj in stream: event = json.loads(obj) if event['rating'] < 3: continue user, item = User(event['user'] - 1), Item(event['item'] - 1) recommender.update(Event(user, item))
faust -A recommender worker -l info
Note that the easiest way to set up a local Kafka environment on Mac would be:
brew install zookeeper kafka brew services start zookeeper brew services start kafka
Producing MovieLens rating events to the Kafka topic
Here, the following code,
producer.py, produces toy events to the dummy topic:
import sys import json from kafka import KafkaProducer from kafka.errors import KafkaError producer = KafkaProducer( bootstrap_servers='localhost:9092', value_serializer=lambda m: json.dumps(m).encode('ascii')) topic = 'flurs-events' keys = ['user', 'item', 'rating', 'timestamp'] with open(sys.argv, 'r') as f: # /path/to/ml-100k/u.data for line in f.readlines(): event = dict(zip(keys, map(int, line.rstrip().split('\t')))) future = producer.send(topic, event) try: future.get(timeout=10) except KafkaError as e: print(e) break
python producer.py /path/to/ml-100k/u.data
The producer simulates Step 2 in the flow of incremental recommendation, where user-item interaction is observed and emitted to a core recommendation engine.
producer.py communicates with
recommender.py via Kafka and Faust, and rating events are fed to the processor.
Thanks to the easy-to-use Python-based streaming processing engine, I was able to provide mock implementation of streaming recommender system by using FluRS. Below I list some open questions we need to consider more closely:
- How to deal with unknown total number of users and items
- We preliminary registered total number of users and items to initialize a recommender, but those numbers are normally unknown at the time of initialization in practice.
- How to choose appropriate serialization format
- JSON-serialization is not always the best choice.
- Event records passing through data streams should be as much as simple for efficiency.
- If we directly pass
flurs.data.entity.Eventobject to a stream, we need to implement a dedicated serializer and deserializer of the custom object.
- How to integrate recommendation logic
- Implementation of Step 1 in the above figure, where a recommendation model is hold and recommendation is actually conducted, is not obvious at this moment.
Based on these observations, I will improve the package to make it more feasible.Tweet