Linker is a tiny networking toolkit with three purpose-built channels: a Control-Channel for typed messages, a Data-Channel that auto-opens per request for bulk transfers, and a Stream-Channel for realtime payloads (audio, video, telemetry). Built for millisecond-level latency, bounded back-pressure, and drop-in AES/RSA security—without heavy frameworks.
25 ms
PCM) with bounded queues and drop-oldest to keep latency flat.Semantics: request/response messages (Message
).
Use for: RPC, signaling, metadata, channel negotiation.
Guarantees: encrypted, ordered, lightweight.
Semantics: short-lived, dedicated pipe automatically opened by Linker for a single heavy transfer.
Use for: file upload/download, model blobs, snapshots.
Behavior: encrypted, back-pressured, auto-closes; run many in parallel.
Semantics: long-lived, framed stream (sequenced/timestamped) with tight jitter.
Use for: audio, video, realtime telemetry.
Behavior: tiny frames, bounded queues, mix-minus for group calls.
import com.talpie.linker.*;
import java.sql.Timestamp;
import java.util.UUID;
public class ClientMain {
private static final UUID STREAM_ID = UUID.fromString("11111111-2222-3333-4444-555555555555");
public static void main(String[] args) {
var linker = new Linker("localhost", 1234);
var client = (ClientService) linker.getService();
client.getListenersHandlers().register(new ClientListeners.Lifecycle() {
@Override public void onStart(Object caller, Timestamp ts) { System.out.println("started"); }
@Override public void onError(Object caller, Timestamp ts, Throwable e) { e.printStackTrace(); }
});
client.getListenersHandlers().register(new ClientListeners.Ping() {
@Override public byte[] feedPing(){ return "HELLO".getBytes(java.nio.charset.StandardCharsets.UTF_8); }
@Override public void onPong(Object c, Timestamp ts, Message m){ System.out.println("pong: " + m.getPayloadString()); }
});
linker.start();
// Control request (async)
client.sendRequest("#demo/echo", "Linker".getBytes())
.thenAccept(reply -> System.out.println("reply: " + reply.getPayloadString()))
.exceptionally(ex -> { ex.printStackTrace(); return null; });
// Open a stream (async)
client.openStreamAsync(STREAM_ID.toString(), /*queue*/16, /*timeoutMs*/4000)
.thenAccept(sc -> System.out.println("stream ready: " + sc.getStreamId()))
.exceptionally(ex -> { ex.printStackTrace(); return null; });
}
}
import com.talpie.linker.*;
import com.talpie.linker.audio.*;
import java.sql.Timestamp;
public class ServerMain {
public static void main(String[] args) {
var linker = new Linker(1234);
var server = (ServerService) linker.getService();
// Control echo
server.getListenersHandlers().register(new ServerListeners.ControlMessage() {
@Override public Message onRequest(Object caller, Timestamp ts, ClientHandler ch, Message m) {
return m.setResponse(("OK: " + m.getPayloadString()).getBytes(java.nio.charset.StandardCharsets.UTF_8));
}
});
// Stream echo
server.getListenersHandlers().register(new ServerListeners.StreamMessage() {
@Override public void streamFrameIn(Object c, Timestamp t, ClientHandler ch, StreamSocketServer ss, StreamFrame f, long onWire) {
server.sendThroughStream(f.streamId, f);
}
});
linker.start();
}
}
You call sendRequest(...)
. Linker may spin up a short-lived Data-Channel automatically for that request.
You handle it via listeners. No manual open/close.
// Kick off upload via Control-Channel
client.sendRequest("#files/upload", metaBytes)
.thenAccept(ack -> System.out.println("ACK: " + ack.getPayloadString()))
.exceptionally(ex -> { ex.printStackTrace(); return null; });
// Auto-opened Data-Channel listener
client.getListenersHandlers().register(new ClientListeners.DataSocket() {
@Override public void dataOpen(Object caller, java.sql.Timestamp ts, DataSocketClient dc) {
try (dc) {
// Send bytes (encrypted, back-pressured)
// dc.getOutputStream().write(buffer);
} catch (Exception e) { e.printStackTrace(); }
}
@Override public void dataClose(Object caller, java.sql.Timestamp ts, DataSocketClient dc) {
System.out.println("[DATA] closed");
}
});
// Control handler acknowledges; Linker will auto-open Data-Channel
server.getListenersHandlers().register(new ServerListeners.ControlMessage() {
@Override public Message onRequest(Object caller, java.sql.Timestamp ts, ClientHandler ch, Message m) {
return m.setResponse("OK".getBytes(java.nio.charset.StandardCharsets.UTF_8));
}
});
// Handle opened Data-Channel
server.getListenersHandlers().register(new ServerListeners.DataSocket() {
@Override public void dataOpen(Object caller, java.sql.Timestamp ts, ClientHandler ch, DataSocketServer dc) {
try (dc) {
// Receive bytes
// dc.getInputStream().transferTo(fileOut);
} catch (Exception e) { e.printStackTrace(); }
}
});
Tip: run multiple bulk requests in parallel — control traffic stays snappy.
client.getListenersHandlers().register(new ClientListeners.Ping() {
@Override public byte[] feedPing(){ return "HELLO".getBytes(java.nio.charset.StandardCharsets.UTF_8); }
@Override public void onPong(Object c, java.sql.Timestamp ts, Message m){ System.out.println("[pong] " + m.getPayloadString()); }
});
server.getListenersHandlers().register(new ServerListeners.Ping() {
@Override public byte[] onPing(Object caller, java.sql.Timestamp ts, ClientHandler ch, byte[] payload) {
return payload; // echo
}
});
client.sendRequest("#demo/hello", "Linker".getBytes())
.thenAccept(reply -> System.out.println("reply: " + reply.getPayloadString()))
.exceptionally(ex -> { ex.printStackTrace(); return null; });
server.getListenersHandlers().register(new ServerListeners.ControlMessage() {
@Override public Message onRequest(Object caller, java.sql.Timestamp ts, ClientHandler ch, Message m) {
if ("#demo/hello".equals(m.getHeader())) {
return m.setResponse(("Hi, " + m.getPayloadString() + "!").getBytes(java.nio.charset.StandardCharsets.UTF_8));
}
return m.setResponse(("Unknown route: " + m.getHeader()).getBytes());
}
});
client.getListenersHandlers().register(new ClientListeners.ControlMessage() {
@Override public Message onRequest(Object caller, java.sql.Timestamp ts, Message m) {
if ("#server/push".equals(m.getHeader())) {
System.out.println("push: " + m.getPayloadString());
return m.setResponse("OK".getBytes());
}
return m;
}
});
server.getListenersHandlers().register(new ServerListeners.ControlMessage() {
@Override public Message onRequest(Object caller, java.sql.Timestamp ts, ClientHandler ch, Message m) {
ch.sendRequest("#server/push", "payload".getBytes())
.thenAccept(resp -> System.out.println("pushed, client replied: " + resp.getPayloadString()))
.exceptionally(ex -> { ex.printStackTrace(); return null; });
return m.setResponse("ACK".getBytes()); // fast ack
}
});
client.getListenersHandlers().register(new ClientListeners.ControlMessage() {
@Override public Message onRequest(Object caller, java.sql.Timestamp ts, Message m) {
if ("#server/broadcast".equals(m.getHeader())) {
System.out.println("broadcast: " + m.getPayloadString());
return m.setResponse("OK".getBytes());
}
return m;
}
});
var clients = new java.util.concurrent.ConcurrentHashMap();
server.getListenersHandlers().register(new ServerListeners.Connection() {
@Override public void onClientConnected(Object c, java.sql.Timestamp ts, ClientHandler ch) {
clients.put(ch.getClientUuid(), ch);
}
@Override public void onClientDisconnected(Object c, java.sql.Timestamp ts, ClientHandler ch, Throwable cause) {
clients.remove(ch.getClientUuid());
}
});
void broadcast(String route, byte[] payload) {
clients.values().forEach(ch -> ch.sendRequest(route, payload).exceptionally(ex -> null));
}
client.openStreamAsync(STREAM_ID.toString(), 16, 4000)
.thenAccept(sc -> System.out.println("stream ready: " + sc.getStreamId()))
.exceptionally(ex -> { ex.printStackTrace(); return null; });
client.getListenersHandlers().register(new ClientListeners.StreamMessage() {
@Override public void streamFrame(Object c, java.sql.Timestamp ts, StreamSocketClient sc, StreamFrame f) {
// handle frames
}
});
server.getListenersHandlers().register(new ServerListeners.StreamMessage() {
@Override public void streamFrameIn(Object c, java.sql.Timestamp ts, ClientHandler ch, StreamSocketServer ss, StreamFrame f, long onWire) {
server.sendThroughStream(f.streamId, f); // echo/relay
}
});
var fmt = new javax.sound.sampled.AudioFormat(48000f, 16, 1, true, false);
var micQ = new java.util.concurrent.LinkedBlockingQueue<byte[]>(4);
new com.talpie.linker.audio.MicCollector(micQ, inputMixerInfo, fmt).start();
client.openStreamAsync(STREAM_ID.toString(), 16, 4000)
.thenAccept(sc -> new com.talpie.linker.audio.StreamOutboundSender(client, STREAM_ID, micQ).start());
client.getListenersHandlers().register(new ClientListeners.StreamMessage() {
@Override public void streamFrame(Object c, java.sql.Timestamp ts, StreamSocketClient sc, StreamFrame f) {
inboundDemux.accept(java.util.UUID.fromString(f.streamId), f.payload);
}
});
new com.talpie.linker.audio.SpeakerPlayer(inboundDemux.getQueue(STREAM_ID), outputMixerInfo, fmt).start();
server.getListenersHandlers().register(new ServerListeners.StreamMessage() {
@Override public void streamFrameIn(Object c, java.sql.Timestamp ts, ClientHandler ch, StreamSocketServer ss, StreamFrame f, long onWire) {
// echo or feed into RoomMixer queues for mix-minus
server.sendThroughStream(f.streamId, f);
}
});
// Same as audio example: mic → STREAM_ID, demux → speaker.
// Server returns mix-minus per participant on the same stream id.
UUID roomId = UUID.randomUUID();
var room = new com.talpie.server.RoomMixer("#call/audio-room:" + roomId, (int)(48000 * 0.025) * 2);
room.start();
room.join(clientHandler, /*senderId*/ senderUuid);
server.getListenersHandlers().register(new ServerListeners.StreamMessage() {
@Override public void streamFrameIn(Object c, java.sql.Timestamp ts, ClientHandler ch, StreamSocketServer ss, StreamFrame f, long onWire) {
var q = room.getInput(senderUuid);
if (q != null) q.offer(f.payload);
}
});
Message
(encrypted ASCII header + binary payload).#_START-STREAM-SOCKET
→ #_STREAM-SOCKET-READY
, then framed StreamFrame
.#_START-DATA-SOCKET
→ #_DATA-SOCKET-READY
, then bulk flow and auto-close.com.talpie.linker.Linker
— bootstrap (client/server)ClientService
, ServerService
— main servicesMessage
— request/response; encrypted header+payloadStreamSocketClient
, StreamSocketServer
— stream endpointsStreamFrame
— seq + timestamp + payloadMicCollector
, SpeakerPlayer
, InboundDemux
, StreamOutboundSender
RoomMixer
(mix-minus)// Requests (always async)
java.util.concurrent.CompletableFuture<Message> sendRequest(String route, byte[] data);
java.util.concurrent.CompletableFuture<Message> sendRequest(String route, byte[] data, long timeoutMillis);
java.util.concurrent.CompletableFuture<Message> sendControlRequest(String route, byte[] data, long timeoutMillis);
// Streams
java.util.concurrent.CompletableFuture<StreamSocketClient> openStreamAsync(String streamId, int queue, long timeoutMs);
void sendThroughStream(String streamId, StreamFrame frame);
java.util.concurrent.CompletableFuture<Void> closeStreamAsync(String streamId, long timeoutMs);
// Targeted requests (async)
java.util.concurrent.CompletableFuture<Message> sendRequestTo(java.util.UUID clientId, String route, byte[] data, long timeoutMillis);
java.util.concurrent.CompletableFuture<Message> sendControlRequestTo(java.util.UUID clientId, String route, byte[] data, long timeoutMillis);
// Broadcast helper
java.util.concurrent.CompletableFuture<java.util.Map<java.util.UUID, Message>> broadcastRequest(String route, byte[] data, long timeoutMillis);
// Streams
java.util.concurrent.CompletableFuture<StreamSocketServer> openStreamFor(java.util.UUID clientId, String streamId, int queue, long timeoutMs);
void sendThroughStream(String streamId, StreamFrame frame);
java.util.concurrent.CompletableFuture<Void> closeStreamFor(java.util.UUID clientId, String streamId, long timeoutMs);
javax.sound.sampled
devices (mixers) for capture/playback
Provided under CC BY-ND 4.0.
Commercial use allowed with attribution; no derivative works.
Full text: https://creativecommons.org/licenses/by-nd/4.0/
linker-1.0.0.jar
to your classpath.