glixir/syn

Distributed process coordination using Erlang’s syn library.

Provides type-safe wrappers for syn’s registry and PubSub functionality, enabling distributed service discovery and event streaming across BEAM nodes.

Inspired by the excellent glyn project’s syn wrapper patterns. Check out glyn for more advanced OTP patterns: https://github.com/glyn-project/glyn

This implementation is optimized for glixir’s type-safe OTP wrappers.

Types

PubSub operation errors

pub type PubSubError {
  JoinFailed(String)
  LeaveFailed(String)
  PublishFailed(String)
}

Constructors

  • JoinFailed(String)
  • LeaveFailed(String)
  • PublishFailed(String)

Registry operation errors

pub type RegistryError {
  RegistrationFailed(String)
  LookupFailed(String)
  UnregistrationFailed(String)
}

Constructors

  • RegistrationFailed(String)
  • LookupFailed(String)
  • UnregistrationFailed(String)
pub type SynIsMemberResult {
  SynIsMemberTrue
  SynIsMemberFalse
  SynIsMemberError(dynamic.Dynamic)
}

Constructors

pub type SynLookupResult {
  SynLookupNotFound
  SynLookupFound(process.Pid, dynamic.Dynamic)
  SynLookupError(dynamic.Dynamic)
}

Constructors

pub type SynMembersResult {
  SynMembersEmpty
  SynMembersList(List(#(String, process.Pid, dynamic.Dynamic)))
  SynMembersError(dynamic.Dynamic)
}

Constructors

pub type SynRegisterResult {
  SynRegisterOk
  SynRegisterError(dynamic.Dynamic)
}

Constructors

pub type SynUnregisterResult {
  SynUnregisterOk
  SynUnregisterError(dynamic.Dynamic)
}

Constructors

pub type SynWhereisResult {
  SynNotFound
  SynFoundPidOnly(process.Pid)
  SynFoundWithMetadata(process.Pid, dynamic.Dynamic)
  SynError(dynamic.Dynamic)
}

Constructors

pub type WhereisResult {
  NotFound
  FoundPidOnly(process.Pid)
  FoundWithMetadata(process.Pid, dynamic.Dynamic)
  WhereisError(String)
}

Constructors

Values

pub fn broadcast_status(
  group: String,
  status_message: json.Json,
) -> Result(Int, PubSubError)

Broadcast a status update to all processes in a coordination group. Common pattern for distributed consensus or health monitoring.

Example

let status = json.object([
  #("node", json.string(node_name)),
  #("load", json.float(cpu_load)),
  #("timestamp", json.int(now))
])

syn.broadcast_status("health_check", status)
pub fn find_worker(
  pool_name: String,
  worker_id: String,
) -> Result(#(process.Pid, load_info), RegistryError)

Find a worker in a pool and get its load information.

Example

case syn.find_worker("image_processors", worker_id) {
  Ok(#(pid, #(cpu_usage, queue_size))) -> // Send work to worker
  Error(_) -> // Worker not available
}
pub fn init_scopes(scopes: List(String)) -> Nil

Initialize scopes for distributed coordination. Call this once at application startup for each scope you plan to use.

Example

syn.init_scopes(["user_sessions", "game_lobbies", "worker_pool"])
pub fn is_member(
  scope: String,
  name: String,
  pid: process.Pid,
) -> Bool
pub fn join(
  scope: String,
  group: String,
) -> Result(Nil, PubSubError)

Join a PubSub group to receive published messages. The current process will receive all messages published to this group.

Example

syn.join("chat_rooms", "general")
// Process will now receive all messages published to "general" chat
pub fn join_coordination(
  group: String,
) -> Result(Nil, PubSubError)

Join a coordination group for distributed algorithm participation. Common pattern for consensus, leader election, or load balancing.

Example

syn.join_coordination("leader_election")
// Process will receive leader election messages
pub fn leave(
  scope: String,
  group: String,
) -> Result(Nil, PubSubError)

Leave a PubSub group to stop receiving messages. The current process will no longer receive messages published to this group.

Example

syn.leave("chat_rooms", "general")
pub fn leave_coordination(
  group: String,
) -> Result(Nil, PubSubError)

Leave a coordination group.

Example

syn.leave_coordination("leader_election") 
pub fn lookup(
  scope: String,
  name: String,
) -> Result(#(process.Pid, metadata), RegistryError)
pub fn member_count(scope: String, group: String) -> Int

Get count of processes subscribed to a group. More efficient than getting the full member list if you only need the count.

Example

let user_count = syn.member_count("chat_rooms", "general")
if user_count > 100 {
  // Maybe split the room
}
pub fn members(
  scope: String,
  group: String,
) -> List(#(process.Pid, metadata))

Get list of member PIDs and metadata without names. Simpler version of members_detailed for when you don’t need member names.

Example

let followers = syn.members("JobDispatcherFollowers", "dispatchers")
// Returns: [#(pid1, metadata1), #(pid2, metadata2)]
pub fn members_detailed(
  scope: String,
  group: String,
) -> List(#(String, process.Pid, metadata))
pub fn publish(
  scope: String,
  group: String,
  message: String,
) -> Result(Int, PubSubError)

Publish a string message to all members of a PubSub group. Returns the number of processes the message was delivered to.

Example

case syn.publish("chat_rooms", "general", "Hello everyone!") {
  Ok(count) -> // Message sent to `count` processes
  Error(_) -> // Publish failed
}
pub fn publish_json(
  scope: String,
  group: String,
  message: message,
  encoder: fn(message) -> json.Json,
) -> Result(Int, PubSubError)

Publish a typed message using JSON encoding. The message will be JSON-encoded before sending to subscribers.

Example

let user_message = json.object([
  #("user", json.string("alice")),
  #("content", json.string("Hello!")),
  #("timestamp", json.int(system_time))
])

syn.publish_json("chat_rooms", "general", user_message, fn(j) { j })
pub fn register(
  scope: String,
  name: String,
  metadata: metadata,
) -> Result(Nil, RegistryError)
pub fn register_worker(
  pool_name: String,
  worker_id: String,
  load_info: load_info,
) -> Result(Nil, RegistryError)

Register a worker process with load information. Common pattern for distributed worker pools.

Example

syn.register_worker("image_processors", worker_id, #(cpu_usage, queue_size))
pub fn unregister(
  scope: String,
  name: String,
) -> Result(Nil, RegistryError)
pub fn whereis(
  scope: String,
  name: String,
) -> Result(#(process.Pid, metadata), RegistryError)

Look up a registered process by scope and name. Returns the process PID and its metadata if found.

Example

case syn.whereis("user_sessions", user_id) {
  Ok(#(pid, #(username, last_active))) -> // Send message to user
  Error(_) -> // User not online
}
Search Document