Recommender system design

Ted Mei
14 min readOct 26, 2020

--

Image is from: https://firebearstudio.com/blog/wp-content/uploads/2014/10/magento-personal-reccomendation-services.jpg

This post mainly focus on the high-level architecture and design of a recommender system in the context of a news APP. It tends to focus more on the system design but less on recommender system algorithms and implementation details.

Goal

Achieve personalized recommendation based on user actions such as click, achieve on massive amount of news articles using a combination of online and offline computing.

Architecture

Above is the general architecture. Looking from bottom to top, Hbase and Hive can be used to store raw data, Hadoop can be used to do batch precessing. In the computing layer, Spark can be used to compute user and item profile. In the matching layer, we need to use multiway recall to select candidate news article from a large corpus. Then in the ranking layer, we can use both machine learning and deep learning models to rank the candidate articles and finally send them back to the backend through grpc.

Workflow

Figure above shows the general workflow. Also view from bottom to top, first we collect user activities. Then we can generate user, item profiles based on news articles and user activities and then do recall and ranking. The ranking result from offline processing combined with online computing are finally sent to app server and return to the user.

With the big picture in mind, we can dive into some design details:

Data models

In terms of raw data, there are 5 tables in MySQL where user_profile and user_basic are related to users, news_article_basic, news_article_content, news_channel are related to news articles with detail below:

user_profile:

user_id, Integer, PK

gender, Integer

birthday, Date

create_time, Date

update_time, Date

user_basic:

user_id, Integer, PK

mobile, Varchar

password, Varchar

user_name, Varchar

last_login, Date

news_article_basic:

article_id, Integer, PK

user_id, Integer

channel_id, Integer

title, Varchar

create_time, Date

update_time, Date

news_article_content:

article_id, Integer, PK

content, Varchar

create_time, Date

update_time, Date

news_channel:

channel_id, Integer, PK (note channel is like topic such as tech, business etc)

channel_name, Varchar

create_time, Date

update_time, Date

Data Migration

The above 5 tables are our original data, while doing recommendation, it is not a good practice to do operations directly on the original data. So our first step is to migrate our original data. In our case, the original data are stored in MySQL. We use Sqoop to migrate data from MySQL to Hive.

In general there are two ways to migrate data through Sqoop. One is to migrate all the data all in once. The other is to migrate the change of data in a certain time period. We choose the second option since it is computationally more efficient. In Sqoop, we can specify incremental mode and give a timestamp, so it will only migrate data which is larger than this timestamp.

Another challenge is that Sqoop currently does not support incremental mode to migrate data from MySQL to Hive. So a work around is to migrate data to hdfs and link hive to hdfs.

Since the original data is changing all the time, we can use crontab to execute the migration in a certain time interval (say every 30 minutes) so that data used in our recommendation is fresh.

User activities collection

Besides the above 5 tables, we also need to collect user activities. The purpose of collecting user activities such as click, reading time, archive, share, exposure (refresh) is to understand users’ behavior and preference and it can also be used to build user profile. One way to collect user activities is to add event tracking code in the App so that when a user click an article for instance, an event log will be generated. It requires a collaboration between project managers and engineers to determine what events to track and it also requires an integration between front end and back end to generate and store all the logs.

Flume can be used to import event logs into Hive. Flume is running all the time, so that event logs are continuously being imported into Hive. Supervisor can be used to manage Flume process. If the process is stuck, Supervisor can restart the process automatically.

Item/Article Profile

The main idea of item profile is to tag articles using keywords and subject words. For simplicity, keywords can be defined as top k words computed by TextRank while topic words can be defined as the intersection of top k words computed by TextRank and TF-IDF. Keywords and topic words can be used to represent the article.

To start with, we can combine the information in news_article_basic, news_article_content and news_channel to get a single unified representation of an article, tokenize the article and remove stop word. Then CountVectorizerModel and IDF in Spark can be used to compute the TF-IDF of every word. The TF-IDF result can be stored in a table named tfidf_keywords_values with schema in Hive as follows:

article_id, Integer

channel_id, Integer

keyword, String

tfidf, Double

Besides TF-IDF, we can also compute TextRank using news article content. The idea of TextRank is to treat each word as a node, there is an edge from center word to its context word within a window. Then we compute the score of each word using PageRank. We can store top k words with high score from each sentence into Hive with schema as follows:

article_id, Integer

channel_id, Integer

keyword, String

textRankScore, Double

Finally we can combine the results from TF-IDF and TextRank to form our article profile (article_profile table) with schema as follows:

article_id, Integer

channel_id, Integer

keywords: Map<String, Double>

topic_words: Array<String>

where the keywords are top k words with highest IDF*TextRank score and topic words are the intersection of top k words with highest TF-IDF and TextRank score.

For new articles coming, for instance every hour, we can easily compute the TF and TextRank of each article. But for IDF since it involves all the articles, we can make some approximation that if the new coming articles are not large, we assume IDF of every word keeps the same. Otherwise, we have to recompute IDF using all the articles.

Apscheduler is a good tool to schedule the execution of Python code every certain time period. Supervisor can also be used to manage the execution process.

Article Similarity

The computation of article similarity is required because we can use it to recommend similar document to users (content recall in the later section) and it can also solve the cold start problem.

One way to compute article similarity is to get the word vector of every word in an article and then the average word vector can be used to represent the article AKA document vector. Spark has a built-in library to compute word vector, it is also not hard to implement word2vec using Pytorch (more detail can be seen in my other post here). In our case, we can first group articles by channel since different channel represents different topics of articles and then train word vector using articles inside each channel.

We use cosine similarity between two document vectors to represent the corresponding document similarity. Within each channel, we can compute all the pairwise document similarity. However, if the number of articles is too large, we can first cluster documents into M groups (using Kmeans, hierarchical clustering etc) and then compute all the pairwise document similarity within a specific subgroup.

The other method is to use LSH (locality sensitive hashing). The basic idea of LSH is that if two articles are similar in their original space, then after hashing they still have high similarity (fall into the same bucket). This idea can be extended to multiple articles where articles hashed to the same bucket can be treated as similar articles.

The final similarity results can be stored in Hbase (named similar_article) with rowkey being article_id, column family possibly named “similar” and a bunch of other article_ids being the column qualifiers. Each cell stores the similarity value. The benefit of Hbase is that it is fast to search and retrieve data especially using rowkey which is important in the case of real-time recommendation.

For newly added articles, it’s better to recompute article similarity using both old articles and new articles together and update the results in Hbase.

User Profile

After article profile and article similarity, the next part is about user profile. User profile can help achieve personalized recommendation or at least more precise recommendation.

At a high level, user profile includes some basic information of user such as name, age gender, etc but more important information are user labels. We can give various labels to a user based on collected user events.

User profile can be stored in Hbase with rowkey being userId. We can have one column family to store user basic information and another column family to store user labels. A hive table can be linked to user profile table in Hbase to serve offline queries. User labels are nothing but topic words of articles that the user interacted with. By interaction, it means the events we have tracked from user such as clicked, shared, collected etc. To determine the weights of each label, we can add some rules based on our business logic. For instance, we can assume if reading within 1000s, add 1 point, bigger than 1000s, add 2 points. Share contributes 3 points etc. To avoid accumulating weights among all the historical data, we can add weight decay. For example we can let label weight decay by 1/(log(t)+1) where t represents the time difference between event first being triggered and current time.

For newly added data, we can run the same process as above and insert new results into Hbase. Apscheduler and Supervisor can also be used to schedule periodic execution and manage running process.

Recall and Ranking Overview

Below is the overview of recall and ranking steps. Based on user and article profiles, we can use multi-way recall to select candidate datasets. Then we can apply some ML or DL models to predict click rate and recommend top highest K results.

In terms of storage, we can store the recall result in Hbase (named articles_recall) with schema as follows:

User_id, Integer, rowKey

Channel_id, Integer

Column family model_recall with each column qualifier being recommended article id (each column family can have multiple versions based on timestamp and a TTL)

Column family content_recall with each column qualifier being recommended article id (each column family can have multiple versions based on timestamp and a TTL)

Column family online with each column qualifier being recommended article id (each column family can have multiple versions based on timestamp and a TTL)

We can add version and TTL to each column family so that we have multiple versions of results and old results get dropped after certain amount of time.

To avoid recommending duplicate content, we also need to have a historical_recall table in Hbase (as a recall filter) that stores articles we have already recommended. When new articles come in, we filter out articles that have already been recommended and rank only new articles.

For each type of recall, we can use Apscheduler and Supervisor to schedule periodic execution and manage running process.

Model Recall (Offline)

For model_recall, we can use ALS (alternating least square) algorithm which is a common matrix factorization algorithm. For each user, we can use ALS to predict its probability to click/share/collect any articles that the user haven’t seen and store the top K non-duplicate articles in Hbase table articles_recall (column family model_recall as stated above). In the meantime, we add new recommended articles to historical_recall table.

Content Recall (Offline)

The idea of content recall is to recommend top K articles that are most similar to the articles that a user has already click/share/collect. In the Article similarity section above, we have created a similar_article table with rowkey being article_i and similar articles in a column family. So we can choose top K similar articles directly from that table. Finally we can save the result in articles_recall table column family content_recall and historical_recall table.

Streaming processing

The main goal of streaming processing is to solve cold start problem from new users and also it can provide up-to-date feedback to user’s action.

In User activities collection section above, Flume is used to import event logs into Hive. In streaming computing, flume can also be used to send event logs to kafka. Spark streaming is used to consume kafka messages. For each article being clicked/shared/collected, we can query similar_article table to get top K similar articles and store the results in articles_recall (column family named online). We also need to update historical_recall to avoid duplicate recommendation.

Popularity recall

Popularity recall is in streaming process as well. For each article being clicked/shared/collected, we can increment the counter of this article by 1. So popular articles are those with high counter value. Redis is used again to store the information. ZINCRBY can be used to increment the member article_id in key {channel_id}:hot by 1.

New article recall

New article recall is also part of the streaming process. To store new article information, we can consume a new kakfa topic (named news_article). For each new article, we can record the information into Redis. In terms of schema, the key is {channel_id}:new, value is a list of articles_id. A TTL can be set to evict outdated articles. The producer of news_article could be an offline job that scans news_article_basic table where articles created say within a week is treated as new article.

Ranking workflow

The figure blow shows a detailed workflow of ranking:

Basically spark can be used to extract user and article features from user and article profile offline. Features can be stored in Hbase for fast retrieval. One limitation of Spark is that it does not support complex ranking models, so we can use Tensorflow to train some offline models. One standard workflow is to store training samples as TFRecords and train some models written in Tensorflow. Tensorflow has its own support to deploy and serve model (through gRPC), so we can combine the results from Spark and Tensorflow to serve ranking requests.

Spark LR (logistic regression) for CTR prediction

In this section, we are going to talk about how to build LR model to predict the probability of a user clicks an article so that we can rank articles based on this probability.

The first step is to build user and article features from user and article profile. Features from user profile can be top K labels with highest weight and features from article profile are the article vector, top K key words with highest weights. We can store the features extracted from user profile into a table (named ctr_feature_user) in Hbase with rowkey being user_id. We can store the features extracted from article profile into a table (named ctr_feature_article) in Hbase with rowkey being article_id. Both tables should also be updated in a scheduled time period.

After feature extraction, we can have a spark dataFrame with columns (article_id, user_id, channel_id, article_vector, user_weights, article_weights, clicked) where article_vector, user_weights and article_weights have type Vectors.

Next we can concatenate all the features from “channel_id” column to article_weights column (Spark has provided a VectorAssembler) and then fit the concatenated features to LogisticRegression model in Spark. Our label will be the “clicked” column.

After the LR model is trained, we can use it to predict the probability of a user clicks an article from recall table. To evaluate the model, we can use various metrics such as precision, recall, F1 score, AUC (area under ROC) etc.

The result of prediction is written in two Hbase tables namely wait_recommend and history_recommend. wait_recommend table stores articles that haven’t been recommended, the schema could be:

User_id, Integer, rowKey

Channel_id, Integer

Column family with each column qualifier being recommended article id

In feed recommendation, when users scroll up, we can serve articles from the wait_recommend table. If there is no content in the table, we can make go through the recall and ranking process to generate a list of recommended articles as described above. The above process is encapsulated into a grpc call to recommendation server which will be described in the next section.

history_recommend table stores articles that have already been recommended. the schema is same as wait_recommend table except a TTL and version is specified so we can limit our storage resources and have different versions of recommendation at different time. Therefore, when users scroll down on the phone, we can display articles from latest timestamp to oldest timestamp.

Wide & Deep

Besides Spark, Tensorflow provides more complex deep learning models. In terms of input data, we can store the extracted features (article_id, user_id, channel_id, article_vector, user_weights, article_weights, clicked) as stated above into TFRecords. While reading TFRecords, we can put features into feature column (tf.feature_column), the type of feature column include both numerical type and categorical type.

Wide & Deep model is a milestone in combing logistic regression (traditional machine learning) and deep learning. Wide side includes features generated by experienced human beings (generally categorial features, and the crossing of categorical features). Deep side involves both spare and continuous features and then the model will do embedding, feature crossing apply neural network. Tensorflow estimator has built-in wide & deep model named DNNLinearCombinedClassifier. We just need to provide features for both the wide and deep side.

FTRL (Follow the regularized leader)

FTRL is an online learning optimization algorithm. The idea of FTRL is to add L1 and L2 regularization into the loss function. L1 regularization can help get sparse matrix and L2 regularization can reduce model complexity and spread weights distribution. The benefit of FTRL is to speed up the training of model given large dataset. In Wide & Deep model, the wide side can be trained using FTRL and the deep side is trained using Adam optimizer.

TF Serving

To deploy and serve model online, we can use TF serving. The first step of using TF serving is to specify input feature columns of our trained model. Next we can use provided interface named export_savedmodel to export our trained model into a directory. Finally we can use docker to start TF serving and bind to that directory. Every time a new model is being exported, TF serving will automatically deploy the latest model. To send request to the deployed service, TF Serving provides API such as prediction_service_pb2_grpc. The response would be the ranking order of articles.

Real time recommendation

First we can define the grpc interface between recommendation server and backend server.

One possible request proto for feed recommendation is string user_id, int32 channel_id, int32 article_num, int64 request_timestamp. A service can be developed for recommendation server that return a list of articles for each request. More specifically, in proto file we can define a service called UserRecommend, inside we can have a rpc named user_recommend which takes the request proto defined above and return another proto containing a list of recommended articles.

Going deeper into the business logic of recommendation. We first need to read articles from mode recall, content recall, popularity recall, new article recall and online. From each table we only read top K articles and then remove them from the table. Next we combine those articles together and filter out duplicated articles and also articles that have already been recommended (exist in history_recommend table). For the remaining articles, we can extract article features by querying table ctr_feature_article which will then be combined with user features extracted from table ctr_feature_user and send to ranking model. The ranking model which has been trained and deployed (described in last section) will predict the probability that a user clicks an article, so we can rank articles by this probability and recommend top K articles.

Since grpc service should be running all the time, supervisor is used to manage the running process.

To evaluate the performance, we can do A/B test. The idea is to split traffic into different bucket by user_id. We can use different ranking/recall method in different bucket and compare the performance.

--

--

No responses yet