glixir/syn
Distributed process coordination using Erlang’s syn library.
Provides type-safe wrappers for syn’s registry and PubSub functionality, enabling distributed service discovery and event streaming across BEAM nodes.
Inspired by the excellent glyn project’s syn wrapper patterns. Check out glyn for more advanced OTP patterns: https://github.com/glyn-project/glyn
This implementation is optimized for glixir’s type-safe OTP wrappers.
Types
PubSub operation errors
pub type PubSubError {
JoinFailed(String)
LeaveFailed(String)
PublishFailed(String)
}
Constructors
-
JoinFailed(String) -
LeaveFailed(String) -
PublishFailed(String)
Registry operation errors
pub type RegistryError {
RegistrationFailed(String)
LookupFailed(String)
UnregistrationFailed(String)
}
Constructors
-
RegistrationFailed(String) -
LookupFailed(String) -
UnregistrationFailed(String)
pub type SynIsMemberResult {
SynIsMemberTrue
SynIsMemberFalse
SynIsMemberError(dynamic.Dynamic)
}
Constructors
-
SynIsMemberTrue -
SynIsMemberFalse -
SynIsMemberError(dynamic.Dynamic)
pub type SynLookupResult {
SynLookupNotFound
SynLookupFound(process.Pid, dynamic.Dynamic)
SynLookupError(dynamic.Dynamic)
}
Constructors
-
SynLookupNotFound -
SynLookupFound(process.Pid, dynamic.Dynamic) -
SynLookupError(dynamic.Dynamic)
pub type SynMembersResult {
SynMembersEmpty
SynMembersList(List(#(String, process.Pid, dynamic.Dynamic)))
SynMembersError(dynamic.Dynamic)
}
Constructors
-
SynMembersEmpty -
SynMembersList(List(#(String, process.Pid, dynamic.Dynamic))) -
SynMembersError(dynamic.Dynamic)
pub type SynRegisterResult {
SynRegisterOk
SynRegisterError(dynamic.Dynamic)
}
Constructors
-
SynRegisterOk -
SynRegisterError(dynamic.Dynamic)
pub type SynUnregisterResult {
SynUnregisterOk
SynUnregisterError(dynamic.Dynamic)
}
Constructors
-
SynUnregisterOk -
SynUnregisterError(dynamic.Dynamic)
pub type SynWhereisResult {
SynNotFound
SynFoundPidOnly(process.Pid)
SynFoundWithMetadata(process.Pid, dynamic.Dynamic)
SynError(dynamic.Dynamic)
}
Constructors
-
SynNotFound -
SynFoundPidOnly(process.Pid) -
SynFoundWithMetadata(process.Pid, dynamic.Dynamic) -
SynError(dynamic.Dynamic)
pub type WhereisResult {
NotFound
FoundPidOnly(process.Pid)
FoundWithMetadata(process.Pid, dynamic.Dynamic)
WhereisError(String)
}
Constructors
-
NotFound -
FoundPidOnly(process.Pid) -
FoundWithMetadata(process.Pid, dynamic.Dynamic) -
WhereisError(String)
Values
pub fn broadcast_status(
group: String,
status_message: json.Json,
) -> Result(Int, PubSubError)
Broadcast a status update to all processes in a coordination group. Common pattern for distributed consensus or health monitoring.
Example
let status = json.object([
#("node", json.string(node_name)),
#("load", json.float(cpu_load)),
#("timestamp", json.int(now))
])
syn.broadcast_status("health_check", status)
pub fn find_worker(
pool_name: String,
worker_id: String,
) -> Result(#(process.Pid, load_info), RegistryError)
Find a worker in a pool and get its load information.
Example
case syn.find_worker("image_processors", worker_id) {
Ok(#(pid, #(cpu_usage, queue_size))) -> // Send work to worker
Error(_) -> // Worker not available
}
pub fn init_scopes(scopes: List(String)) -> Nil
Initialize scopes for distributed coordination. Call this once at application startup for each scope you plan to use.
Example
syn.init_scopes(["user_sessions", "game_lobbies", "worker_pool"])
pub fn is_member(
scope: String,
name: String,
pid: process.Pid,
) -> Bool
pub fn join(
scope: String,
group: String,
) -> Result(Nil, PubSubError)
Join a PubSub group to receive published messages. The current process will receive all messages published to this group.
Example
syn.join("chat_rooms", "general")
// Process will now receive all messages published to "general" chat
pub fn join_coordination(
group: String,
) -> Result(Nil, PubSubError)
Join a coordination group for distributed algorithm participation. Common pattern for consensus, leader election, or load balancing.
Example
syn.join_coordination("leader_election")
// Process will receive leader election messages
pub fn leave(
scope: String,
group: String,
) -> Result(Nil, PubSubError)
Leave a PubSub group to stop receiving messages. The current process will no longer receive messages published to this group.
Example
syn.leave("chat_rooms", "general")
pub fn leave_coordination(
group: String,
) -> Result(Nil, PubSubError)
Leave a coordination group.
Example
syn.leave_coordination("leader_election")
pub fn lookup(
scope: String,
name: String,
) -> Result(#(process.Pid, metadata), RegistryError)
pub fn member_count(scope: String, group: String) -> Int
Get count of processes subscribed to a group. More efficient than getting the full member list if you only need the count.
Example
let user_count = syn.member_count("chat_rooms", "general")
if user_count > 100 {
// Maybe split the room
}
pub fn members(
scope: String,
group: String,
) -> List(#(process.Pid, metadata))
Get list of member PIDs and metadata without names. Simpler version of members_detailed for when you don’t need member names.
Example
let followers = syn.members("JobDispatcherFollowers", "dispatchers")
// Returns: [#(pid1, metadata1), #(pid2, metadata2)]
pub fn members_detailed(
scope: String,
group: String,
) -> List(#(String, process.Pid, metadata))
pub fn publish(
scope: String,
group: String,
message: String,
) -> Result(Int, PubSubError)
Publish a string message to all members of a PubSub group. Returns the number of processes the message was delivered to.
Example
case syn.publish("chat_rooms", "general", "Hello everyone!") {
Ok(count) -> // Message sent to `count` processes
Error(_) -> // Publish failed
}
pub fn publish_json(
scope: String,
group: String,
message: message,
encoder: fn(message) -> json.Json,
) -> Result(Int, PubSubError)
Publish a typed message using JSON encoding. The message will be JSON-encoded before sending to subscribers.
Example
let user_message = json.object([
#("user", json.string("alice")),
#("content", json.string("Hello!")),
#("timestamp", json.int(system_time))
])
syn.publish_json("chat_rooms", "general", user_message, fn(j) { j })
pub fn register(
scope: String,
name: String,
metadata: metadata,
) -> Result(Nil, RegistryError)
pub fn register_worker(
pool_name: String,
worker_id: String,
load_info: load_info,
) -> Result(Nil, RegistryError)
Register a worker process with load information. Common pattern for distributed worker pools.
Example
syn.register_worker("image_processors", worker_id, #(cpu_usage, queue_size))
pub fn unregister(
scope: String,
name: String,
) -> Result(Nil, RegistryError)
pub fn whereis(
scope: String,
name: String,
) -> Result(#(process.Pid, metadata), RegistryError)
Look up a registered process by scope and name. Returns the process PID and its metadata if found.
Example
case syn.whereis("user_sessions", user_id) {
Ok(#(pid, #(username, last_active))) -> // Send message to user
Error(_) -> // User not online
}