Pub Sub schema

Pub/Sub: C implementation

Note: this implementation is not optimized as:

  • Unsubscribing sets NULL on subscriptions array. Would be good to delete NULL subscriptions from there.
  • Publishing message is done sequentially. The messages will be delivered in order of subscription.

The code

c
//
// pubsub.c
// mpp-cpu
//
// Created by https://github.com/nullxx on 22/3/22.
//
#include "pubsub.h"
#include <stdio.h>
#include <stdlib.h>
PubSubSubscription **subscriptions = NULL;
unsigned int subscription_count = 0;
PubSubSubscription *subscribe_to(PubSubTopic topic, on_message on_message_fn) {
PubSubSubscription *subscription = malloc(sizeof(PubSubSubscription));
if (subscription == NULL) {
return NULL;
}
subscription->id = subscription_count++; // id is also the index inside subscriptions
subscription->topic = topic;
subscription->on_message_fn = on_message_fn;
// allocate for n subscriptions
subscriptions = realloc(subscriptions, sizeof(PubSubSubscription) * subscription_count);
subscriptions[subscription->id] = subscription;
return subscription;
}
bool unsubscribe_for(PubSubSubscription *sub) {
if (sub == NULL) return false;
if (sub->id >= subscription_count) {
return false;
}
free(sub);
sub = NULL;
subscriptions[sub_id] = NULL;
return true;
}
int publish_message_to(PubSubTopic topic, void *value) {
PubSubMessage message = {
.topic = topic,
.value = value
};
int sent = 0;
// find the subs subscribed to this topic
for (int i = 0; i < subscription_count; i++) {
PubSubSubscription *sub = subscriptions[i];
if (sub == NULL || sub->topic != topic) continue;
sub->on_message_fn(message);
sent++;
}
return sent;
}
c
//
// pubsub.h
// mpp-cpu
//
// Created by https://github.com/nullxx on 22/3/22.
//
#ifndef pubsub_h
#define pubsub_h
#include <stdbool.h>
typedef enum {
NONE_PUBSUB_TOPIC = 0,
DATA_BUS_TOPIC
} PubSubTopic;
typedef struct {
PubSubTopic topic;
void *value;
} PubSubMessage;
typedef void (*on_message)(PubSubMessage);
typedef struct {
unsigned int id;
PubSubTopic topic;
on_message on_message_fn;
} PubSubSubscription;
PubSubSubscription *subscribe_to(PubSubTopic topic, on_message on_message_fn);
bool unsubscribe_for(PubSubSubscription *sub);
int publish_message_to(PubSubTopic topic, void *value);
#endif /* pubsub_h */
c
//
// pubsubtest.c
// mpp-cpu
//
// Created by https://github.com/nullxx on 22/3/22.
//
#include <stdio.h>
#include "pubsub.h"
void on_message_fn(PubSubMessage message) {
printf("Message Topic: %d\n", message.topic);
printf("Message Value: %s\n", (char*)message.value);
}
int main(void) {
PubSubSubscription *sub = subscribe_to(DATA_BUS_TOPIC, &on_message_fn);
if (sub == NULL) {
printf("Error subscribing\n");
exit(-1);
}
// publish a message
int published_to1 = publish_message_to(DATA_BUS_TOPIC, (void*) "Message");
// unsubscribe from a topic
unsubscribe_for(sub);
// publishing a message again
int published_to2 = publish_message_to(DATA_BUS_TOPIC, (void*) "Message2");
// ... and again
int published_to3 = publish_message_to(DATA_BUS_TOPIC, (void*) "Message3");
printf("published_to1 %d\n", published_to1); // 1, received by on_message_fn
printf("published_to2 %d\n", published_to2); // 0, after unsubscribing, NOT received by on_message_fn
printf("published_to3 %d\n", published_to3); // ... same!
return 0;
}