public class UNICAST extends Protocol implements AgeOutCache.Handler<Address>
This
layer is used to reliably transmit point-to-point messages, that is, either messages sent to a
single receiver (vs. messages multicast to a group) or for example replies to a multicast message. The
sender uses an AckSenderWindow
which retransmits messages for which it hasn't received
an ACK, the receiver uses AckReceiverWindow
which keeps track of the lowest seqno
received so far, and keeps messages in order.
Messages in both AckSenderWindows and AckReceiverWindows will be removed. A message will be removed from AckSenderWindow when an ACK has been received for it and messages will be removed from AckReceiverWindow whenever a message is received: the new message is added and then we try to remove as many messages as possible (until we stop at a gap, or there are no more messages).
限定符和类型 | 类和说明 |
---|---|
protected class |
UNICAST.ConnectionReaper |
protected static class |
UNICAST.ReceiverEntry |
protected class |
UNICAST.RetransmitTask
Retransmitter task which periodically (every xmit_interval ms) looks at all the retransmit (send) tables and
re-sends messages for which we haven't received an ack yet
|
protected class |
UNICAST.SenderEntry |
static class |
UNICAST.UnicastHeader
The following types and fields are serialized:
| DATA | seqno | conn_id | first |
| ACK | seqno |
| SEND_FIRST_SEQNO |
|
限定符和类型 | 字段和说明 |
---|---|
protected AgeOutCache<Address> |
cache |
protected long |
conn_expiry_timeout |
protected java.util.concurrent.Future<?> |
connection_reaper |
static long |
DEFAULT_FIRST_SEQNO |
protected short |
last_conn_id |
protected Address |
local_addr |
protected int |
max_msg_batch_size |
protected long |
max_retransmit_time |
protected java.util.List<Address> |
members |
protected long |
num_acks_received |
protected long |
num_acks_sent |
protected long |
num_msgs_received |
protected long |
num_msgs_sent |
protected long |
num_xmits |
protected java.util.concurrent.ConcurrentMap<Address,UNICAST.ReceiverEntry> |
recv_table |
protected java.util.concurrent.locks.ReentrantLock |
recv_table_lock |
protected boolean |
running |
protected int |
segment_capacity
已过时。
|
protected java.util.concurrent.ConcurrentMap<Address,UNICAST.SenderEntry> |
send_table |
protected int[] |
timeout
已过时。
|
protected TimeScheduler |
timer |
protected long |
xmit_interval |
protected long |
xmit_table_max_compaction_time |
protected int |
xmit_table_msgs_per_row |
protected int |
xmit_table_num_rows |
protected double |
xmit_table_resize_factor |
protected java.util.concurrent.Future<?> |
xmit_task
RetransmitTask running every xmit_interval ms
|
构造器和说明 |
---|
UNICAST() |
限定符和类型 | 方法和说明 |
---|---|
java.lang.Object |
down(Event evt)
An event is to be sent down the stack.
|
java.util.Map<java.lang.String,java.lang.Object> |
dumpStats() |
void |
expired(Address key)
Called by AgeOutCache, to removed expired connections
|
AgeOutCache<Address> |
getAgeOutCache() |
int |
getAgeOutCacheSize() |
java.lang.String |
getLocalAddress() |
long |
getMaxRetransmitTime() |
java.lang.String |
getMembers() |
protected short |
getNewConnectionId() |
long |
getNumAcksReceived() |
long |
getNumAcksSent() |
int |
getNumberOfMessagesInReceiveWindows() |
int |
getNumConnections() |
long |
getNumMessagesReceived() |
long |
getNumMessagesSent() |
int |
getNumReceiveConnections() |
int |
getNumSendConnections() |
int |
getNumUnackedMessages()
The number of messages in all Entry.sent_msgs tables (haven't received an ACK yet)
|
long |
getNumXmits() |
protected UNICAST.ReceiverEntry |
getOrCreateReceiverEntry(Address sender,
long seqno,
short conn_id) |
protected UNICAST.ReceiverEntry |
getReceiverEntry(Address sender,
long seqno,
boolean first,
short conn_id) |
int[] |
getTimeout() |
long |
getXmitTableMissingMessages() |
int |
getXmitTableNumCompactions() |
int |
getXmitTableNumMoves() |
int |
getXmitTableNumPurges() |
int |
getXmitTableNumResizes() |
long |
getXmitTableUndeliveredMessages() |
protected void |
handleAckReceived(Address sender,
long seqno,
short conn_id) |
protected void |
handleBatchReceived(Address sender,
java.util.Map<java.lang.Short,java.util.List<Message>> map) |
protected void |
handleDataReceived(Address sender,
long seqno,
short conn_id,
boolean first,
Message msg,
Event evt)
Check whether the hashtable contains an entry e for
sender (create if not). |
protected void |
handleResendingOfFirstMessage(Address sender,
long seqno)
We need to resend our first message with our conn_id
|
protected void |
handleUpEvent(Address sender,
UNICAST.UnicastHeader hdr) |
boolean |
hasSendConnectionTo(Address dest)
Used for testing only
|
boolean |
isConnectionReaperRunning() |
boolean |
isXmitTaskRunning() |
java.lang.String |
printAgeOutCache() |
java.lang.String |
printConnections() |
protected java.lang.String |
printMessageList(java.util.List<Message> list) |
java.lang.String |
printReceiveWindowMessages() |
java.lang.String |
printSendWindowMessages() |
void |
reapIdleConnections() |
void |
removeAllConnections()
This method is public only so it can be invoked by unit testing, but should not otherwise be used !
|
protected int |
removeAndDeliver(java.util.concurrent.atomic.AtomicBoolean processing,
Table<Message> win,
Address sender)
Try to remove as many messages as possible from the table as pass them up.
|
void |
removeConnection(Address mbr)
Removes and resets from connection table (which is already locked).
|
void |
removeReceiveConnection(Address mbr) |
void |
removeSendConnection(Address mbr) |
void |
resetStats() |
void |
retransmit(Message msg)
Called by AckSenderWindow to resend messages for which no ACK has been received yet
|
protected void |
sendAck(Address dst,
long seqno,
short conn_id) |
protected void |
sendRequestForFirstSeqno(Address dest,
long seqno_received) |
void |
setMaxMessageBatchSize(int size) |
void |
setMaxRetransmitTime(long max_retransmit_time) |
void |
setTimeout(int[] val)
已过时。
|
void |
start()
This method is called on a
Channel.connect(String) . |
protected void |
startConnectionReaper() |
protected void |
startRetransmitTask() |
void |
stop()
This method is called on a
Channel.disconnect() . |
protected void |
stopConnectionReaper() |
protected void |
stopRetransmitTask() |
java.lang.Object |
up(Event evt)
An event was received from the layer below.
|
void |
up(MessageBatch batch)
Sends up a multiple messages in a
MessageBatch . |
accept, destroy, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, init, isErgonomics, level, parse, printStats, providedDownServices, providedUpServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, setValues, statsEnabled
public static final long DEFAULT_FIRST_SEQNO
@Deprecated protected int[] timeout
protected int max_msg_batch_size
protected long conn_expiry_timeout
@Deprecated protected int segment_capacity
protected int xmit_table_num_rows
protected int xmit_table_msgs_per_row
protected double xmit_table_resize_factor
protected long xmit_table_max_compaction_time
protected long max_retransmit_time
protected long xmit_interval
protected long num_msgs_sent
protected long num_msgs_received
protected long num_acks_sent
protected long num_acks_received
protected long num_xmits
protected final java.util.concurrent.ConcurrentMap<Address,UNICAST.SenderEntry> send_table
protected final java.util.concurrent.ConcurrentMap<Address,UNICAST.ReceiverEntry> recv_table
protected final java.util.concurrent.locks.ReentrantLock recv_table_lock
protected java.util.concurrent.Future<?> xmit_task
protected volatile java.util.List<Address> members
protected Address local_addr
protected TimeScheduler timer
protected volatile boolean running
protected short last_conn_id
protected AgeOutCache<Address> cache
protected java.util.concurrent.Future<?> connection_reaper
public int[] getTimeout()
@Deprecated public void setTimeout(int[] val)
public void setMaxMessageBatchSize(int size)
public java.lang.String getLocalAddress()
public java.lang.String getMembers()
public boolean isConnectionReaperRunning()
public int getNumSendConnections()
public int getNumReceiveConnections()
public int getNumConnections()
public java.lang.String printConnections()
public long getNumMessagesSent()
public long getNumMessagesReceived()
public long getNumAcksSent()
public long getNumAcksReceived()
public long getNumXmits()
public long getMaxRetransmitTime()
public void setMaxRetransmitTime(long max_retransmit_time)
public boolean isXmitTaskRunning()
public int getAgeOutCacheSize()
public java.lang.String printAgeOutCache()
public AgeOutCache<Address> getAgeOutCache()
public boolean hasSendConnectionTo(Address dest)
public int getNumUnackedMessages()
public int getNumberOfMessagesInReceiveWindows()
public long getXmitTableUndeliveredMessages()
public long getXmitTableMissingMessages()
public int getXmitTableNumCompactions()
public int getXmitTableNumMoves()
public int getXmitTableNumResizes()
public int getXmitTableNumPurges()
public java.lang.String printReceiveWindowMessages()
public java.lang.String printSendWindowMessages()
public void resetStats()
resetStats
在类中 Protocol
public java.util.Map<java.lang.String,java.lang.Object> dumpStats()
public void start() throws java.lang.Exception
Protocol
Channel.connect(String)
. Starts work.
Protocols are connected and queues are ready to receive events.
Will be called from bottom to top. This call will replace
the START and START_OK events.start
在类中 Protocol
java.lang.Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
to fail, so Channel.connect(String)
will throw an exceptionpublic void stop()
Protocol
Channel.disconnect()
. Stops work (e.g. by closing multicast socket).
Will be called from top to bottom. This means that at the time of the method invocation the
neighbor protocol below is still working. This method will replace the
STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that
when this method is called all messages in the down queue will have been flushedpublic java.lang.Object up(Event evt)
Protocol
down_prot.down()
or c) the event (or another event) is sent up
the stack using up_prot.up()
.protected void handleUpEvent(Address sender, UNICAST.UnicastHeader hdr)
public void up(MessageBatch batch)
Protocol
MessageBatch
. The sender of the batch is always the same, and so is the
destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed
messages, although the transport itself will create initial MessageBatches that contain only either OOB or
regular messages.
The default processing below sends messages up the stack individually, based on a matching criteria
(calling Protocol.accept(Message)
), and - if true - calls Protocol.up(Event)
for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.
Subclasses should check if there are any messages destined for them (e.g. using
MessageBatch.getMatchingMessages(short, boolean)
), then possibly remove and process them and finally pass
the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all
encrypted messages in the batch, not remove them, and pass the batch up when done.public java.lang.Object down(Event evt)
Protocol
down_prot.down()
. In case of a GET_ADDRESS event (which tries to
retrieve the stack's address from one of the bottom layers), the layer may need to send
a new response event back up the stack using up_prot.up()
.public void removeConnection(Address mbr)
public void removeSendConnection(Address mbr)
public void removeReceiveConnection(Address mbr)
public void removeAllConnections()
public void retransmit(Message msg)
public void expired(Address key)
expired
在接口中 AgeOutCache.Handler<Address>
key
- protected void handleDataReceived(Address sender, long seqno, short conn_id, boolean first, Message msg, Event evt)
sender
(create if not). If
e.received_msgs is null and first
is true: create a new AckReceiverWindow(seqno) and
add message. Set e.received_msgs to the new window. Else just add the message.protected void handleBatchReceived(Address sender, java.util.Map<java.lang.Short,java.util.List<Message>> map)
protected int removeAndDeliver(java.util.concurrent.atomic.AtomicBoolean processing, Table<Message> win, Address sender)
protected UNICAST.ReceiverEntry getReceiverEntry(Address sender, long seqno, boolean first, short conn_id)
protected UNICAST.ReceiverEntry getOrCreateReceiverEntry(Address sender, long seqno, short conn_id)
protected void handleAckReceived(Address sender, long seqno, short conn_id)
protected void handleResendingOfFirstMessage(Address sender, long seqno)
sender
- seqno
- Resend the non null messages in the range [lowest .. seqno]protected void startRetransmitTask()
protected void stopRetransmitTask()
protected void sendAck(Address dst, long seqno, short conn_id)
protected void startConnectionReaper()
protected void stopConnectionReaper()
protected short getNewConnectionId()
protected void sendRequestForFirstSeqno(Address dest, long seqno_received)
public void reapIdleConnections()
protected java.lang.String printMessageList(java.util.List<Message> list)