Real-time Marketing Automation with Distributed Actor Systems and Akka.NET

MarkedUp Logo

This is an archive of a blog post I wrote for the MarkedUp Analytics blog on July 23rd, 2014. It’s been a popular post and I’m posting it here in order to preserve it. I shut down MarkedUp in November, 2014.

The MarkedUp team has had experience developing SDKs, high-performance APIs, working with Cassandra / Hadoop, and service-oriented software for several years. However, when we started laying out the blueprint for MarkedUp In-app Marketing we knew that it would require a radically different set of tools than the traditional stateless HTTP APIs we’d been used to developing for some time.

Some Background

Before we dive into the the guts of how the system works, it’s more important to understand what the MarkedUp team is trying to achieve and why.

Most marketing automation software is borderline terrible, ineffective, or totally onerous to use. This is largely because marketing automation is an afterthought and is hardly ever integrated into systems that software companies regularly use, such as product analytics.

We wanted to build a product that made it extremely easy to:

  1. Quickly and successfully set up drip campaigns of messages, especially for users who are non-technical;
  2. Allow customers to leverage existing MarkedUp Analytics events and data;
  3. Target and segment users based on behavior AND demographics; and
  4. Measure results quickly.

Our Analytics services have a reputation for being tremendously easy to use compared to the alternatives – we wanted to create a similarly good experience for in-app marketing automation.

With that background in mind, now we can talk about how In-app Marketing works.

How In-app Marketing Works

We included the following diagram in our public launch of MarkedUp In-app Marketing, and it’s a helpful launch point for discussing the technology behind it.

How MarkedUp In-app Marketing Works

The process for working with MarkedUp In-app Marketing looks like this, from the point of view of one of our users:

  1. Integrate the MarkedUp SDK and define events inside your application;
  2. Release your app with MarkedUp successfully integrated to your users and begin collecting data;
  3. Sign up for MarkedUp In-app Marketing and define campaigns of messages that are delivered based on an end-user behavior;
  4. MarkedUp In-app Marketing immediately begins filtering users based off of the behavior segments you defined and automatically subscribes them to any eligible campaigns.

And here’s an example of an actual behavior we can target:

“Select all users from the United States and Canada who installed after 6/1/2014, have run the app within the past two days, have had event foo, had event bar, and have viewed page Buy.”

In this case events “foo” and “bar” are some arbitrary events that are specific to a MarkedUp customer’s application – they can mean anything.

Technical Challenges

So what’s challenging about building this product? Let’s put this in list form:

  1. [Real-time] Customers get the best results on their messages when they’re delivered immediately after a user is “subscribed” to a campaign – therefore, our segmentation system for filtering subscribers must be capable of making decisions within seconds of receiving critical events. This eliminates traditional batch processing via Hive / Hadoop queries as a feasible option – we’re going to have to process data as streams instead.
  2. [Stateful] Overwhelmingly, most campaigns customers define will require us to observe multiple events per user – this means that we have to maintain state that is incrementally completed for each user over multiple successive events. Given that this needs to be done in real-time we’re not going to read/write from a database on every request – state will probably have to be kept in-memory somewhere.
  3. [Highly Available] The system must be capable of supporting millions of parallel data streams and must be able to recover from failures – we need to have a way of throwing hardware at the problem when demand on the system increases (happens suddenly when it does) and we need to be able to recover from software and hardware failures quickly.
  4. [Remoting] The targeting system needs to be able to use the data-stream from our API in a loosely coupled fashion – we need some way of quickly sharing the fire hose of data that our API servers collect, and do it in a way that is loosely coupled enough where a downed in-app marketing server won’t disrupt the API from serving its primary function: storing customer’s data.

The real-time component is what makes this entire project a challenge – and no, it is not optional. It’s a real competitive advantage and essential to our “it just works” promise we make to our customers. We’re in the business of delivering on promise of “better experience than everything else” at MarkedUp.

So how in the hell were we going to be build a system that was highly-available, stateful, loosely-coupled, and able to respond in real-time?

Solution: Actor Model

As with 99% of challenging technical problems faced by today’s software developers, a solution was already invented in the early 1970s: the Actor model.

The Actor model’s premise is that every component of your system is an “actor” and all actors communicate with each other by passing immutable messages to each other. Each actor has a unique address inside an actor system, even across multiple physical computers, so it’s possible to route a message to one specific actor. Actors are also composed in hierarchies, and parent actors are responsible for supervising the child actors one level beneath them – if a child actor crashes suddenly the parent actor can make a decision about how to proceed next.

The actor model appealed to us for the following reasons:

  1. Actors are cheap – you can have 10s of millions of them with minimal overhead. This means that we can have an actor to track every user / campaign tuple, of which there might be millions. This localizes the “filtering” problem nicely for us – we can define a filter that operates at the per-user level, so it’s a tiny piece of code. Sure, there might be millions of these filters running at once – but the code is highly atomized into small pieces.
  2. Remoting and addressing between actor systems makes it easy to route data for each user to a specific location – using a technique like consistent hash routing, we can push all of the state for each individual user to the same location in memory even across a large cluster of machines and do it in a way that avoids load-balancing hot-spots.
  3. Actors only process one message in their inbox at a time – therefore it’s really easy to for us to process streams for individual users, since it will be done serially. This allows us to process data streams for each individual user and each in-app marketing campaign with a simple Finite State Machine.
  4. Actor hierarchies and supervision allow our software to be self-healing and highly available – the supervision aspect of actor hierarchies is immensely powerful. It allows us to make local decisions about what to do in the event of failure, and we can simply “reboot” part of our actor system automatically if something goes wrong. In the event of hardware failure, we can re-route around an unavailable node and redistribute the work accordingly.
  5. Actor model offers a fantastically simple API for highly concurrent computing, which is exactly what we need. We’re handling thousands of parallel events for hundreds of different apps running on millions of different devices – our incoming data stream is already inherently concurrent. Being able to manage this workload in a stateful way is challenging, but the Actor model exposes a simple API that eliminates the need for us to worry about threads, locks, and the usual synchronization concerns.

There are certainly other ways we could have solved this problem and we evaluated them, but we were ultimately sold on the Actor model because of its simplicity relative to the others.

Distributed Actor Systems in .NET with Akka.NET

We teamed up with some other like-minded folks to develop Akka.NET – a distributed actor framework for .NET that closely follows Scala’s Akka project.

Akka.NET offers all of the important features of the Actor model in C# and F#, and a number of critical features that were essential to building MarkedUp In-app Marketing:

  1. A hefty collection of built-in router actors, such as the RoundRobinRouter and the ConsistentHashRouter – both of which include the ability to automatically scale up or down on-demand if needed (via the Resizer function.)
  2. Out of the box Finite State Machine actors, which are exactly we need for segmenting users.
  3. Robust actor supervision and message scheduling APIs, which we use for self-terminating and remote-terminated actors.
  4. Remoting capabilities for distributing actor workloads across multiple physical systems.
  5. Highly extensible logging and actor system configuration APIs.
  6. And some pretty insane performance benchmarks (21 million messages per second) – bottom line is that the overhead of the Actor system itself probably isn’t going to be an issue for us.

We settled on Akka.NET as our framework and used it as the building blocks for the back-end of MarkedUp In-app Marketing.

MarkedUp In-app Marketing Network Topology

We’ve left out some details of our service-oriented architecture above, but the network topology shown above covers the In-app Marketing product in its entirety.

MarkedUp has two public services exposed directly to end-users via an HTTP API:

  1. The “MarkedUp API” – which is what our analytics SDKs communicate with inside your applications; it handles millions of HTTP requests per day and does 100m+ database writes per day. Most of those writes are counter updates which are inexpensive, but the bottom line is that there’s a lot of activity going on inside this API.
  2. The Win32 Mailbox Service – a brand new service that we released as part of our In-app Marketing launch for Windows Desktop applications; all of our Win32 messaging clients work via HTTP polling since there’s a number of tricky permissions issues related to keeping an open socket in the background on each version of Windows (a subject for a separate blog-post.) This is the endpoint these clients use to check for available messages.

The goal of our Targeting System is to take the streams of data directly from the MarkedUp API servers and populate mailbox messages for individual users in accordance with the app developer’s filtering rules, and we use Akka.NET to do this.

Filtering Messages, Users, and Campaigns with Actors, State Machines, and Routing

Success for MarkedUp In-app Marketing’s Targeting System is defined as “being able to subscribe a user into one or more campaigns within seconds of receiving the segmentation data specified by the customer” for N concurrent users per server, where N is a coefficient determined largely by the size of the hardware and number of potential campaigns per-user, which varies for each one of our customer’s apps.

Our product is designed to filter messages for specific users for campaigns that are specific a customer’s app, so we reflected these relationships in our actor hierarchy.

Markedup IAM remote routers

Data arrives to the Targeting System from the MarkedUp API via Akka.NET’s built-in remoting over TCP sockets, and we’ll get into the details in a moment. For the time being, all you need to know is that the data is routed to our API Router, a round-robin pool router actor that specializes in concurrently load-balancing requests across a number of worker actors (API Router Agents) who actually respond to the requests.

The number of workers that exist at any given time can be hard-coded to a value of N workers, or it can be resizable based on load depending on how you configure the router.

Each API Router Agent is responsible for doing one thing: making sure that data for a specific user makes it to that user’s actor. Here’s what that process looks like:

Markedup IAM actor hierarchy

And here’s a rough idea of what the source code looks like:

public class MarkedUpApiRouterActor : UntypedActor
{
    private ActorSelection _appIndexActor;

    protected override void PreStart()
    {
        _appIndexActor = Context.ActorSelection(ActorNames.MarkedUpAppMasterActor.Path);
    }

    protected override void OnReceive(object message)
    {
        PatternMatch.Match(message)
            .With<IMKSession>(m => ForwardToRequestActor(m.AppId, m))
            .With<IUser>(m => ForwardToRequestActor(m.AppId, m))
            .With<ISessionEvent>(m => ForwardToRequestActor(m.AppId, m))
            .With<IMKLogMessage>(m => ForwardToRequestActor(m.AppId, m))
            .With<ICommercialTransaction>(m => ForwardToRequestActor(m.AppId, m))
            .Default(Unhandled);

    }

    private void ForwardToRequestActor<T>(string appId, T message)
    {
        _appIndexActor.Tell(new ForwardOntoApp<T>(appId, message));
    }
}

The PatternMatch.Match method is used to filter messages based off of their C# type – any messages that we don’t match are “unhandled” and logged. In Akka.NET, all messages are just objects.

In terms of where we’re sending messages, we have a fixed address scheme inside our in-app marketing product that makes it easy for us to locate individual users, campaigns, and apps. Suppose we have an app called “App1” and a user called “UserA” – we use Akka.NET’s built-in address scheme to make it really easy to determine if this user already exists.

Every single actor inside Akka.NET has a unique address – expressed via an ActorUri and ActorPath, like this:

akka.tcp://<hostname>:<port>@<actor-system-name>/user/{parent}/{child}

When you’re routing messages within the in-process actor system all you really care about is the ActorPath – the /user/parent/child part.

Markedup IAM user actor hierarchy

We constructed our actor hierarchy to include a single App Master actor (/user/apps), responsible for supervising every “App Actor” **for our customer’s apps (/user/apps/{customer’s app ID}) – and every single user we ever observe inside MarkedUp is always associated with an app, so every App Actor supervises one or more **“User Actors” who can be found at /user/apps/{customer’s app ID}/{userId}.

If App Master shuts down or restarts, all of its child actors shutdown or restart with it – if a child actor dies on its own, it’s up to the App Master to decide what to do next. This is the essence of how actor supervision works.

So the API Router Agent forwards the message to App Master which kicks off a process of lazily creating App and User actors on-demand, but eventually the messages do arrive inside the inbox of the User Actor.

**The User Actor implements an **Akka.NET Finite State Machine to determine which campaigns this user should start filtering for – this is determined by (1) which, if any, campaigns are available for this app and (2) which campaigns this user has already been subscribed.

Here’s what the User Actor’s initial state looks like in C#:

When(UserState.Initializing, fsmEvent =>
{
    State<UserState, UserStateData> nextState = null;
    fsmEvent.FsmEvent.Match()
        .With<RecycleIfIdle>(recycle =>
        {
            nextState = IsIdle() ? Stop(new Normal()) : Stay();
        })
        .With<UserLoadResponse>(load =>
        {
            SetLastReceive();
            nextState = DetermineUserLoadedState(load);
        })
        .With<AddCampaign>(add =>
        {
            SetLastReceive();
            nextState = HandleAddCampaign(add.CampaignId);
        })
        .With<RemoveCampaign>(remove =>
        {
            SetLastReceive();
            nextState = HandleRemoveCampaign(remove.CampaignId);
        })
        .Default(m =>
        {
            SetLastReceive();
            CurrentStash.Stash();
            nextState = Stay();
        });

    return nextState;
});

After a User Actor has determined that it’s eligible to be subscribed into at least 1 additional campaign, it’ll change it’s state into a “Ready” state where it begins sending messages to the “Campaign State Actors” responsible for filtering the rules for every possible campaign this user can belong to.

The User Actor has three jobs:

  1. Determine which campaigns a specific app user can be a possible subscriber;
  2. Serialize all of the data for this user into a linear sequence, based on when the message arrived, and hand this data over to the Campaign State Actors for filtering; and
  3. Automatically shut down the User Actor if the user stops being active.

Items #1 and #2 are pretty generic, but item #3 is more interesting – how do we determine that an app user is no longer using their application?

We do this by setting a “Receive Time” that marks the UTC time a user last received a message from our API:

SetLastReceive();
SendToFilter(m);
nextState = Stay();

The SetLastReceive function sets this time value, and then we have a timer using Akka.NET’s FSM’s built-in scheduler that checks on whether on not this user is still active once every 60 seconds:

StartWith(UserState.Initializing, new InitialUserStateData(_appId, _userId));
SetTimer(RecycleIfIdleTimerName, new RecycleIfIdle(), TimeSpan.FromMinutes(1), true);

If we receive a “RecycleIfIdle” message from this timer and the user is determined to be idle:

private bool IsIdle()
{
    return Math.Abs((DateTime.UtcNow - _lastReceivedMessage).TotalSeconds) >= SystemConstants.MaxUserIdleTime.TotalSeconds;
}

Then the User Actor stops itself and all of the Campaign State actors beneath it. This is how we free up memory and resources for future actors.

The Campaign State actor itself is another FSM and it communicates with a group of dedicated “Filter Actors” who process the campaign’s rules via a domain-specific language we invented for filtering. All the Campaign State actor does it process the results from the Filter Actors and save its state to Cassandra or send messages to the user if the user’s event stream satisfies all of the requirements for a campaign.

Communication between Remote Actor Systems with Akka.Remote

The MarkedUp API and the Targeting System communicate with each other via Akka’s Remoting features using a TCP socket and Google’s protocol buffers, and the MarkedUp API uses the ActorUri and ActorPath convention I showed you earlier to ensure that these messages are routed directly to the API Router Actor on the Targeting System.

public class MessagePublishingActor : UntypedActor
{
    private Config _routerConfig;
    private ActorRef _router;

    protected override void PreStart()
    {
        var config = @"routees.paths = [
                ""akka.tcp://markedup-notifications@$REPLACE$/user/api""
            ]";

        var notificationsEndpoint = ConfigurationManager.AppSettings["NotificationsEndpoint"] ?? "127.0.0.1:9991";
        config = config.Replace("$REPLACE$", notificationsEndpoint);
        _routerConfig = ConfigurationFactory.ParseString(config);
        _router = Context.ActorOf(Props.Empty.WithRouter(new RoundRobinGroup(_routerConfig)));
    }

    protected override void OnReceive(object message)
    {
        _router.Tell(message);
    }
}

The MessagePublishingActor lives inside the MarkedUp API and uses a RoundRobinGroup router to communicate with specific, named Actors on a remote system.

In production we can have several Targeting System’s serving as routees and we use a ConsistentHash router to make sure that all messages for the same user always arrive at the same server, but for the sake of brevity I rewrote this use a single server and a RoundRobinGroup router.

A RoundRobinGroup is different from a RoundRobinPool in that the RoundRobinGroup doesn’t directly supervise or manage it routees – it forwards messages to actors that are pre-created, whereas the RoundRobinPool creates and manages the worker actors themselves.

The important part, however, is the addressing – using the /user/api convention, which is the valid ActorPath for the API Router Actor on the Targeting System, Akka will automatically route my messages from the API server to the Targeting System via TCP, and then Akka’s remoting internals will ensure that these messages are correctly routed to this actor.

As for the messages themselves, and this is important – both the MarkedUp API and the Targeting System share a common assembly that defines all of the message types that can be exchange over the network. Otherwise we couldn’t deserialize any of those messages at the other end of the network connection.

Wrapping Up

Akka.NET has been a boon to our productivity, because of how simple its programming model is – instead of writing a piece of code that tries to determine campaign eligibility for 10,000 users in parallel, we can write a piece of code that makes that determination for a single user and run 10,000 instances of it with minimal overhead.

The actor model does an excellent job of atomizing code into very small parts, particularly because actors can only process one message in their inbox at a time (except for router actors.) The serial nature of message processing inherently makes everything inside an actor thread-safe, so you can store local state inside each actor instead of having to rely on a big synchronized cache or polling a distributed cache in Redis / Memcached.

We’ll have some more posts in the future about some of the other cool stuff we’re using Actors for and some of the integrations we’ve set up, such as our live debugger using SignalR and Akka.NET’s logging capabilities.

MarkedUp In-App Marketing Demo (Post-mortum)

I preserved the original MarkedUp In-App Marketing Demo on my personal YouTube channel, so you can see what this system actually did!

Discussion, links, and tweets

I'm the CTO and founder of Petabridge, where I'm making distributed programming for .NET developers easy by working on Akka.NET, Phobos, and more..