OnSubscribe

In Short

Build an RX Extension Method that calls an Action once an RX subscriber is ready and listening.

// OnSubscribed: Example usage
var subscription = sourceStream
                    .OnSubscribed(() => OnReady())
                    .Subscribe(data => Process(data));

Are You Ready Yet?

Consider an application that listens to live trade data. It might be useful to have an on-screen indicator that shows when the connection is established and the application is listening.

But how do you know when the subscription is actually established?

If the subscription is being made synchronously, then of course you know it is subscribed once the calling thread reaches the next line.

// OnSubscribed: Sychronous subscription
var subscription = sourceStream
                    .Subscribe(data => Process(data));

OnReady();

However, what if the subscription is on a taskpool thread?

// OnSubscribed: Asychronous subscription
var subscription = sourceStream
                    .SubscribeOn(TaskPoolScheduler.Default)
                    .Subscribe(data => Process(data));
// Not ready yet…
// OnReady();

There isn’t anywhere to put the call to OnReady now. It can’t be with Process because that is called when an item pumps, not when the subscription is ready.

Building OnSubscribe

So what are our requirements for an OnSubscribe method? Simply, we need it to inform us (call our action) when the subscriber is ready to receive a message.

To write a test for this, let’s set it up such that as soon as the action is called, we pump something onto the source stream. This should be received by the subscriber. If ther subscriber misses it, then it wasn’t actually ready.

[Test]
public void When_subscription_is_ready_then_should_receive_item_pumped()
{
    // ARRANGE
    var source = new Subject<Unit>();

    var receivedPump = false;

    // ACT
    source.OnSubscribed(() => source.OnNext(Unit.Default))
          .Subscribe(_ => receivedPump = true);

    // ASSERT
    Assert.IsTrue(receivedPump);
}

OnSubscribe – Version 1

One might think that we can simply call the action as soon as OnSubscribed is called?

public static IObservable<T> OnSubscribed<T>(this IObservable<T> source,
                                              Action onSubscribed)
{
    // This is wrong on a number of levels…
    onSubscribed();
    return source;
}

This executes the action when the OnSubscribed() method is called, which is when the subscription is declared, not when the subscription is being set up. To demonstrate this:

[Test]
public void OnSubscribed_action_should_not_be_called_during_declaration()
{
    // ARRANGE
    var source = new Subject<Unit>();

    var onSubscribedCalled = false;

    // ACT
    // Note – we aren't subscribing
    source.OnSubscribed(() => onSubscribedCalled = true);

    // ASSERT
    // This assert fails.
    Assert.IsFalse(onSubscribedCalled);
}

OnSubscribe – Version 2

The code inside an Observable.Create is called during subscription. Let’s try calling our action there.

public static IObservable<T> OnSubscribed<T>(this IObservable<T> source,
                                             Action onSubscribed)
{
    return Observable.Create<T>(o =>
            {
                // This code is executed at subscription time
                onSubscribed();
                return source.Subscribe(o);
            });
}

This is slightly better. At least the action is called during subscription time. However, at the point at which the onSubscribed action is called, the subscription is not actually ready. The source.Subscribe call has not yet been made and so the pump on source is missed. Can we reverse the order of those two lines?

OnSubscribe – Version 3

We are almost there. Let’s make sure we’ve made the subscription first, before executing the action.

public static IObservable<T> OnSubscribed<T>(this IObservable<T> source,
                                             Action onSubscribed)
{
    return Observable.Create<T>(o =>
            {
                // Set up the subscription
                var subscription = source.Subscribe(o);
                
                // Execute action
                onSubscribed();

                // Return the subscription disposable
                return subscription;
            });
}

This works.

However, there is the situation where if the onSubscribed action attempts to dispose of the subscription, we haven’t returned the disposable yet and we’ll get a null reference exception.

[Test]
public void OnSubscribed_action_disposes_subscription_should_not_throw()
{
    // ARRANGE
    var source = new Subject<Unit>();

    IDisposable subscription = null;

    // ACT & ASSERT
    Assert.DoesNotThrow(() =>
            {
                subscription = source.OnSubscribed(() => subscription.Dispose())
                                     .Subscribe();
            });
}

Now this may seem to be getting into the realms of ridiculousness. Why would you want to tear down a subscription having just set it up? Alas, it is possible in theory for such a situation to exist – given a long pipeline and plenty of misdirection. So perhaps we should try to accommodate it?

The whole point of this OnSubscribed call was for when the SubscribeOn was on a background thread. In that case it turns the above into a race condition. See below.

[Test]
public void OnSubscribed_action_disposes_subscription_should_not_throw()
{
    // ARRANGE
    var source = new Subject<Unit>();

    IDisposable subscription = null;

    // ACT & ASSERT
    // Subscribing on a background thread introduces a race condition
    // between the assignment of the subscription variable, and its
    // usage to call Dispose.
    Assert.DoesNotThrow(() =>
            {
                subscription = source.OnSubscribed(() => subscription.Dispose())
                                     .SubscribeOn(TaskPoolScheduler.Default)
                                     .Subscribe();
            });
}

This test will now pass most of the time.

Conclusion

We have set out to try to build an extension method to detect when a subscription is set up. We have got most of the way there. Our final version meets the requirements with the slight caveat that having the action attempt to dispose of the subscription contains a race condition.

So can we execute the action in such a way that it is guaranteed to occur after the subscription is set up and the disposable returned? Perhaps we would need to somehow schedule it to occur after the disposable has returned? We’d need to ensure that the onSubscribed action isn’t called if the subscription is torn down before the OnSubscribed is called. To be continued…

Sharing the RX Goodness

Welcome to RXG

RX is a set of constructs that makes concurrent programming more functional and compositional in style.

RX can be pretty tricky.

My team and I work with RX everyday as software developers for financial companies based in central London. We are always solving problems using it so I thought I’d post some of the things we’ve learnt on this blog.

Hope you find some of it useful.