Encrypted, low-latency control, data & streaming for Java.

Linker is a tiny networking toolkit with three purpose-built channels: a Control-Channel for typed messages, a Data-Channel that auto-opens per request for bulk transfers, and a Stream-Channel for realtime payloads (audio, video, telemetry). Built for millisecond-level latency, bounded back-pressure, and drop-in AES/RSA security—without heavy frameworks.

Java 17+ AES/RSA 25 ms frames

Download

Direct JAR

linker-1.0.0.jar
Host it directly, e.g. /linker-1.0.0.jar.

Download

Why Linker?

  • Three channels, one API: Control, auto-open Data (per request), and Stream—separate pipes that don’t block each other.
  • Snappy by default: tiny frames (e.g. 25 ms PCM) with bounded queues and drop-oldest to keep latency flat.
  • Secure transport: RSA handshake/signing + AES with a fresh IV/nonce per message (control, data, stream).
  • Simple listeners: register small handlers; no containers, no ceremony.
  • Batteries for audio: mic capture, speaker playback, demux, and server-side mix-minus rooms included.

Channels at a glance

Control-Channel

Semantics: request/response messages (Message).

Use for: RPC, signaling, metadata, channel negotiation.

Guarantees: encrypted, ordered, lightweight.

Data-Channel (per request, auto-open)

Semantics: short-lived, dedicated pipe automatically opened by Linker for a single heavy transfer.

Use for: file upload/download, model blobs, snapshots.

Behavior: encrypted, back-pressured, auto-closes; run many in parallel.

Stream-Channel

Semantics: long-lived, framed stream (sequenced/timestamped) with tight jitter.

Use for: audio, video, realtime telemetry.

Behavior: tiny frames, bounded queues, mix-minus for group calls.

Quick Start

Client — connect, ping, open stream
import com.talpie.linker.*;
import java.sql.Timestamp;
import java.util.UUID;

public class ClientMain {
  private static final UUID STREAM_ID = UUID.fromString("11111111-2222-3333-4444-555555555555");
  public static void main(String[] args) {
    var linker = new Linker("localhost", 1234);
    var client = (ClientService) linker.getService();

    client.getListenersHandlers().register(new ClientListeners.Lifecycle() {
      @Override public void onStart(Object caller, Timestamp ts) { System.out.println("started"); }
      @Override public void onError(Object caller, Timestamp ts, Throwable e) { e.printStackTrace(); }
    });
    client.getListenersHandlers().register(new ClientListeners.Ping() {
      @Override public byte[] feedPing(){ return "HELLO".getBytes(java.nio.charset.StandardCharsets.UTF_8); }
      @Override public void onPong(Object c, Timestamp ts, Message m){ System.out.println("pong: " + m.getPayloadString()); }
    });

    linker.start();

    // Control request (async)
    client.sendRequest("#demo/echo", "Linker".getBytes())
          .thenAccept(reply -> System.out.println("reply: " + reply.getPayloadString()))
          .exceptionally(ex -> { ex.printStackTrace(); return null; });

    // Open a stream (async)
    client.openStreamAsync(STREAM_ID.toString(), /*queue*/16, /*timeoutMs*/4000)
          .thenAccept(sc -> System.out.println("stream ready: " + sc.getStreamId()))
          .exceptionally(ex -> { ex.printStackTrace(); return null; });
  }
}
Server — start, echo control & stream
import com.talpie.linker.*;
import com.talpie.linker.audio.*;
import java.sql.Timestamp;

public class ServerMain {
  public static void main(String[] args) {
    var linker = new Linker(1234);
    var server = (ServerService) linker.getService();

    // Control echo
    server.getListenersHandlers().register(new ServerListeners.ControlMessage() {
      @Override public Message onRequest(Object caller, Timestamp ts, ClientHandler ch, Message m) {
        return m.setResponse(("OK: " + m.getPayloadString()).getBytes(java.nio.charset.StandardCharsets.UTF_8));
      }
    });

    // Stream echo
    server.getListenersHandlers().register(new ServerListeners.StreamMessage() {
      @Override public void streamFrameIn(Object c, Timestamp t, ClientHandler ch, StreamSocketServer ss, StreamFrame f, long onWire) {
        server.sendThroughStream(f.streamId, f);
      }
    });

    linker.start();
  }
}

Data-Channel (auto-open, bulk transfers)

You call sendRequest(...). Linker may spin up a short-lived Data-Channel automatically for that request. You handle it via listeners. No manual open/close.

Client — trigger & handle
// Kick off upload via Control-Channel
client.sendRequest("#files/upload", metaBytes)
      .thenAccept(ack -> System.out.println("ACK: " + ack.getPayloadString()))
      .exceptionally(ex -> { ex.printStackTrace(); return null; });

// Auto-opened Data-Channel listener
client.getListenersHandlers().register(new ClientListeners.DataSocket() {
  @Override public void dataOpen(Object caller, java.sql.Timestamp ts, DataSocketClient dc) {
    try (dc) {
      // Send bytes (encrypted, back-pressured)
      // dc.getOutputStream().write(buffer);
    } catch (Exception e) { e.printStackTrace(); }
  }
  @Override public void dataClose(Object caller, java.sql.Timestamp ts, DataSocketClient dc) {
    System.out.println("[DATA] closed");
  }
});
Server — accept & stream
// Control handler acknowledges; Linker will auto-open Data-Channel
server.getListenersHandlers().register(new ServerListeners.ControlMessage() {
  @Override public Message onRequest(Object caller, java.sql.Timestamp ts, ClientHandler ch, Message m) {
    return m.setResponse("OK".getBytes(java.nio.charset.StandardCharsets.UTF_8));
  }
});

// Handle opened Data-Channel
server.getListenersHandlers().register(new ServerListeners.DataSocket() {
  @Override public void dataOpen(Object caller, java.sql.Timestamp ts, ClientHandler ch, DataSocketServer dc) {
    try (dc) {
      // Receive bytes
      // dc.getInputStream().transferTo(fileOut);
    } catch (Exception e) { e.printStackTrace(); }
  }
});

Tip: run multiple bulk requests in parallel — control traffic stays snappy.

Ping

Client — feed & handle pong
client.getListenersHandlers().register(new ClientListeners.Ping() {
  @Override public byte[] feedPing(){ return "HELLO".getBytes(java.nio.charset.StandardCharsets.UTF_8); }
  @Override public void onPong(Object c, java.sql.Timestamp ts, Message m){ System.out.println("[pong] " + m.getPayloadString()); }
});
Server — onPing
server.getListenersHandlers().register(new ServerListeners.Ping() {
  @Override public byte[] onPing(Object caller, java.sql.Timestamp ts, ClientHandler ch, byte[] payload) {
    return payload; // echo
  }
});

Control — request/response

Client — sendRequest (async)
client.sendRequest("#demo/hello", "Linker".getBytes())
      .thenAccept(reply -> System.out.println("reply: " + reply.getPayloadString()))
      .exceptionally(ex -> { ex.printStackTrace(); return null; });
Server — handler
server.getListenersHandlers().register(new ServerListeners.ControlMessage() {
  @Override public Message onRequest(Object caller, java.sql.Timestamp ts, ClientHandler ch, Message m) {
    if ("#demo/hello".equals(m.getHeader())) {
      return m.setResponse(("Hi, " + m.getPayloadString() + "!").getBytes(java.nio.charset.StandardCharsets.UTF_8));
    }
    return m.setResponse(("Unknown route: " + m.getHeader()).getBytes());
  }
});

Server → Client push

Client — handle push route
client.getListenersHandlers().register(new ClientListeners.ControlMessage() {
  @Override public Message onRequest(Object caller, java.sql.Timestamp ts, Message m) {
    if ("#server/push".equals(m.getHeader())) {
      System.out.println("push: " + m.getPayloadString());
      return m.setResponse("OK".getBytes());
    }
    return m;
  }
});
Server — fire & async-ack
server.getListenersHandlers().register(new ServerListeners.ControlMessage() {
  @Override public Message onRequest(Object caller, java.sql.Timestamp ts, ClientHandler ch, Message m) {
    ch.sendRequest("#server/push", "payload".getBytes())
      .thenAccept(resp -> System.out.println("pushed, client replied: " + resp.getPayloadString()))
      .exceptionally(ex -> { ex.printStackTrace(); return null; });
    return m.setResponse("ACK".getBytes()); // fast ack
  }
});

Broadcast

Client — receive broadcast
client.getListenersHandlers().register(new ClientListeners.ControlMessage() {
  @Override public Message onRequest(Object caller, java.sql.Timestamp ts, Message m) {
    if ("#server/broadcast".equals(m.getHeader())) {
      System.out.println("broadcast: " + m.getPayloadString());
      return m.setResponse("OK".getBytes());
    }
    return m;
  }
});
Server — fan out (async)
var clients = new java.util.concurrent.ConcurrentHashMap();
server.getListenersHandlers().register(new ServerListeners.Connection() {
  @Override public void onClientConnected(Object c, java.sql.Timestamp ts, ClientHandler ch) {
    clients.put(ch.getClientUuid(), ch);
  }
  @Override public void onClientDisconnected(Object c, java.sql.Timestamp ts, ClientHandler ch, Throwable cause) {
    clients.remove(ch.getClientUuid());
  }
});
void broadcast(String route, byte[] payload) {
  clients.values().forEach(ch -> ch.sendRequest(route, payload).exceptionally(ex -> null));
}

Stream — realtime

Client — open & receive
client.openStreamAsync(STREAM_ID.toString(), 16, 4000)
      .thenAccept(sc -> System.out.println("stream ready: " + sc.getStreamId()))
      .exceptionally(ex -> { ex.printStackTrace(); return null; });

client.getListenersHandlers().register(new ClientListeners.StreamMessage() {
  @Override public void streamFrame(Object c, java.sql.Timestamp ts, StreamSocketClient sc, StreamFrame f) {
    // handle frames
  }
});
Server — relay/echo
server.getListenersHandlers().register(new ServerListeners.StreamMessage() {
  @Override public void streamFrameIn(Object c, java.sql.Timestamp ts, ClientHandler ch, StreamSocketServer ss, StreamFrame f, long onWire) {
    server.sendThroughStream(f.streamId, f); // echo/relay
  }
});

Realtime Audio (25 ms frames)

Client — mic → stream → speaker
var fmt  = new javax.sound.sampled.AudioFormat(48000f, 16, 1, true, false);
var micQ = new java.util.concurrent.LinkedBlockingQueue<byte[]>(4);
new com.talpie.linker.audio.MicCollector(micQ, inputMixerInfo, fmt).start();

client.openStreamAsync(STREAM_ID.toString(), 16, 4000)
      .thenAccept(sc -> new com.talpie.linker.audio.StreamOutboundSender(client, STREAM_ID, micQ).start());

client.getListenersHandlers().register(new ClientListeners.StreamMessage() {
  @Override public void streamFrame(Object c, java.sql.Timestamp ts, StreamSocketClient sc, StreamFrame f) {
    inboundDemux.accept(java.util.UUID.fromString(f.streamId), f.payload);
  }
});
new com.talpie.linker.audio.SpeakerPlayer(inboundDemux.getQueue(STREAM_ID), outputMixerInfo, fmt).start();
Server — relay or room-mix
server.getListenersHandlers().register(new ServerListeners.StreamMessage() {
  @Override public void streamFrameIn(Object c, java.sql.Timestamp ts, ClientHandler ch, StreamSocketServer ss, StreamFrame f, long onWire) {
    // echo or feed into RoomMixer queues for mix-minus
    server.sendThroughStream(f.streamId, f);
  }
});

Group Calls (mix-minus)

Client — standard audio path
// Same as audio example: mic → STREAM_ID, demux → speaker.
// Server returns mix-minus per participant on the same stream id.
Server — RoomMixer
UUID roomId = UUID.randomUUID();
var room = new com.talpie.server.RoomMixer("#call/audio-room:" + roomId, (int)(48000 * 0.025) * 2);
room.start();
room.join(clientHandler, /*senderId*/ senderUuid);
server.getListenersHandlers().register(new ServerListeners.StreamMessage() {
  @Override public void streamFrameIn(Object c, java.sql.Timestamp ts, ClientHandler ch, StreamSocketServer ss, StreamFrame f, long onWire) {
    var q = room.getInput(senderUuid);
    if (q != null) q.offer(f.payload);
  }
});

Security

Wire-level (summary)

  • Control & Data messages are Message (encrypted ASCII header + binary payload).
  • Stream handshake: #_START-STREAM-SOCKET#_STREAM-SOCKET-READY, then framed StreamFrame.
  • Data handshake: #_START-DATA-SOCKET#_DATA-SOCKET-READY, then bulk flow and auto-close.

API Surface (most used)

com.talpie.linker.Linker — bootstrap (client/server)
ClientService, ServerService — main services
Message — request/response; encrypted header+payload
StreamSocketClient, StreamSocketServer — stream endpoints
StreamFrame — seq + timestamp + payload
Audio: MicCollector, SpeakerPlayer, InboundDemux, StreamOutboundSender
Server add-on: RoomMixer (mix-minus)

Key methods (ClientService)

// Requests (always async)
java.util.concurrent.CompletableFuture<Message> sendRequest(String route, byte[] data);
java.util.concurrent.CompletableFuture<Message> sendRequest(String route, byte[] data, long timeoutMillis);
java.util.concurrent.CompletableFuture<Message> sendControlRequest(String route, byte[] data, long timeoutMillis);

// Streams
java.util.concurrent.CompletableFuture<StreamSocketClient> openStreamAsync(String streamId, int queue, long timeoutMs);
void sendThroughStream(String streamId, StreamFrame frame);
java.util.concurrent.CompletableFuture<Void> closeStreamAsync(String streamId, long timeoutMs);

Key methods (ServerService)

// Targeted requests (async)
java.util.concurrent.CompletableFuture<Message> sendRequestTo(java.util.UUID clientId, String route, byte[] data, long timeoutMillis);
java.util.concurrent.CompletableFuture<Message> sendControlRequestTo(java.util.UUID clientId, String route, byte[] data, long timeoutMillis);

// Broadcast helper
java.util.concurrent.CompletableFuture<java.util.Map<java.util.UUID, Message>> broadcastRequest(String route, byte[] data, long timeoutMillis);

// Streams
java.util.concurrent.CompletableFuture<StreamSocketServer> openStreamFor(java.util.UUID clientId, String streamId, int queue, long timeoutMs);
void sendThroughStream(String streamId, StreamFrame frame);
java.util.concurrent.CompletableFuture<Void> closeStreamFor(java.util.UUID clientId, String streamId, long timeoutMs);

Requirements

License

Provided under CC BY-ND 4.0.
Commercial use allowed with attribution; no derivative works.
Full text: https://creativecommons.org/licenses/by-nd/4.0/

Get Started

  1. Add linker-1.0.0.jar to your classpath.
  2. Run the server sample.
  3. Run the client sample (see ping/pong).
  4. Push 25 ms frames on a Stream-Channel — or fire a bulk request and let the Data-Channel auto-open.