public class NAKACK2 extends Protocol implements DiagnosticsHandler.ProbeHandler
限定符和类型 | 类和说明 |
---|---|
protected static class |
NAKACK2.Counter |
protected class |
NAKACK2.RetransmitTask
Retransmitter task which periodically (every xmit_interval ms) looks at all the retransmit tables and
sends retransmit request to all members from which we have missing messages
|
限定符和类型 | 字段和说明 |
---|---|
protected BoundedList<Message> |
become_server_queue |
protected int |
become_server_queue_size |
protected BoundedList<java.lang.String> |
digest_history
Keeps a bounded list of the last N digest sets
|
protected boolean |
discard_delivered_msgs
Messages that have been received in order are sent up the stack (= delivered to the application).
|
protected static Filter<Message> |
dont_loopback_filter |
protected static Message |
DUMMY_OOB_MSG |
protected boolean |
is_server |
protected boolean |
leaving |
protected Address |
local_addr |
protected boolean |
log_discard_msgs
If true, logs messages discarded because received from other members
|
protected boolean |
log_not_found_msgs |
protected int |
max_msg_batch_size |
protected long |
max_rebroadcast_timeout |
protected java.util.List<Address> |
members |
protected Filter<Message> |
no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs |
protected int |
num_messages_received |
protected int |
num_messages_sent |
protected static int |
NUM_REBROADCAST_MSGS |
protected boolean |
print_stability_history_on_failed_xmit
已过时。
|
protected Digest |
rebroadcast_digest |
protected java.util.concurrent.locks.Lock |
rebroadcast_digest_lock |
protected java.util.concurrent.locks.Condition |
rebroadcast_done |
protected java.util.concurrent.locks.Lock |
rebroadcast_lock |
protected boolean |
rebroadcasting |
protected boolean |
running |
protected BoundedList<java.lang.String> |
stability_msgs
Keeps the last N stability messages
|
protected SuppressLog<Address> |
suppress_log_non_member
Log to suppress identical warnings for messages from non-members
|
protected long |
suppress_time_non_member_warnings |
protected TimeScheduler |
timer |
protected boolean |
use_mcast_xmit
Retransmit messages using multicast rather than unicast.
|
protected boolean |
use_mcast_xmit_req
Use a multicast to request retransmission of missing messages.
|
protected View |
view |
protected boolean |
xmit_from_random_member
Ask a random member for retransmission of a missing message.
|
protected long |
xmit_interval |
protected java.util.concurrent.atomic.AtomicLong |
xmit_reqs_received |
protected java.util.concurrent.atomic.AtomicLong |
xmit_reqs_sent |
protected java.util.concurrent.atomic.AtomicLong |
xmit_rsps_received |
protected java.util.concurrent.atomic.AtomicLong |
xmit_rsps_sent |
protected java.util.concurrent.ConcurrentMap<Address,Table<Message>> |
xmit_table
Map to store sent and received messages (keyed by sender)
|
protected long |
xmit_table_max_compaction_time |
protected int |
xmit_table_msgs_per_row |
protected int |
xmit_table_num_rows |
protected double |
xmit_table_resize_factor |
protected java.util.concurrent.Future<?> |
xmit_task
RetransmitTask running every xmit_interval ms
|
protected java.util.Map<Address,java.lang.Long> |
xmit_task_map
Used by the retransmit task to keep the last retransmitted seqno per sender (https://issues.jboss.org/browse/JGRP-1539)
|
构造器和说明 |
---|
NAKACK2() |
限定符和类型 | 方法和说明 |
---|---|
protected void |
adjustReceivers(java.util.List<Address> members)
Removes old members from xmit-table and adds new members to xmit-table (at seqnos hd=0, hr=0).
|
protected void |
cancelRebroadcasting() |
protected void |
checkForRebroadcasts() |
void |
clearNonMemberCache() |
void |
compact() |
protected Table<Message> |
createTable(long initial_seqno) |
protected void |
deliver(Message msg,
Address sender,
long seqno,
java.lang.String error_msg) |
protected void |
deliverBatch(MessageBatch batch) |
java.lang.Object |
down(Event evt)
Callback.
|
java.util.Map<java.lang.String,java.lang.Object> |
dumpStats() |
java.lang.String |
dumpXmitTablesNumCurrentRows() |
protected void |
flushBecomeServerQueue()
Flushes the queue.
|
int |
getBecomeServerQueueSizeActual() |
long |
getCurrentSeqno() |
Digest |
getDigest()
Returns a message digest: for each member P the highest delivered and received seqno is added
|
Digest |
getDigest(Address mbr) |
boolean |
getLogDiscardMessages() |
int |
getNonMemberMessages() |
long |
getSizeOfAllMessages() |
long |
getSizeOfAllMessagesInclHeaders() |
Table<Message> |
getWindow(Address sender)
Returns the receive window for sender; only used for testing.
|
long |
getXmitRequestsReceived() |
long |
getXmitRequestsSent() |
long |
getXmitResponsesReceived() |
long |
getXmitResponsesSent() |
long |
getXmitTableCapacity() |
int |
getXmitTableMissingMessages() |
int |
getXmitTableNumCompactions() |
int |
getXmitTableNumCurrentRows() |
int |
getXmitTableNumMoves() |
int |
getXmitTableNumPurges() |
int |
getXmitTableNumResizes() |
int |
getXmitTableUndeliveredMsgs() |
protected void |
handleMessage(Message msg,
NakAckHeader2 hdr)
Finds the corresponding retransmit buffer and adds the message to it (according to seqno).
|
protected void |
handleMessages(Address dest,
Address sender,
java.util.List<Tuple<java.lang.Long,Message>> msgs,
boolean oob,
AsciiString cluster_name) |
java.util.Map<java.lang.String,java.lang.String> |
handleProbe(java.lang.String... keys)
Handles a probe.
|
protected void |
handleXmitReq(Address xmit_requester,
SeqnoList missing_msgs,
Address original_sender)
Retransmits messsages first_seqno to last_seqno from original_sender from xmit_table to xmit_requester,
called when XMIT_REQ is received.
|
protected void |
handleXmitRsp(Message msg,
NakAckHeader2 hdr) |
void |
init()
Called after instance has been created (null constructor) and before protocol is started.
|
boolean |
isDiscardDeliveredMsgs() |
protected static boolean |
isGreaterThanOrEqual(Digest first,
Digest other)
Returns true if all senders of the current digest have their seqnos >= the ones from other
|
boolean |
isUseMcastXmit() |
boolean |
isXmitFromRandomMember() |
boolean |
isXmitTaskRunning() |
protected void |
mergeDigest(Digest digest)
For all members of the digest, adjust the retransmit buffers in xmit_table.
|
protected Message |
msgFromXmitRsp(Message msg,
NakAckHeader2 hdr) |
protected void |
overwriteDigest(Digest digest)
Overwrites existing entries, but does NOT remove entries not found in the digest
|
java.lang.String |
printDigestHistory() |
java.lang.String |
printMessages() |
java.lang.String |
printStabilityHistory() |
java.lang.String |
printStabilityMessages() |
java.lang.String |
printStats() |
java.util.List<java.lang.Integer> |
providedUpServices()
List of events that are provided to layers above (they will be handled when sent down from above)
|
protected void |
queueMessage(Message msg,
long seqno) |
protected void |
rebroadcastMessages()
Takes the argument highest_seqnos and compares it to the current digest.
|
protected void |
removeAndPassUp(Table<Message> buf,
Address sender,
boolean loopback,
AsciiString cluster_name)
Efficient way of checking whether another thread is already processing messages from sender.
|
protected void |
reset() |
void |
resetStats() |
protected void |
retransmit(long first_seqno,
long last_seqno,
Address sender) |
protected void |
retransmit(long first_seqno,
long last_seqno,
Address sender,
boolean multicast_xmit_request) |
protected void |
retransmit(SeqnoList missing_msgs,
Address sender,
boolean multicast_xmit_request) |
protected void |
send(Event evt,
Message msg)
Adds the message to the sent_msgs table and then passes it down the stack.
|
protected void |
sendXmitRsp(Address dest,
Message msg)
Sends a message msg to the requester.
|
protected void |
setDigest(Digest digest)
Creates a retransmit buffer for each sender in the digest according to the sender's seqno.
|
protected void |
setDigest(Digest digest,
boolean merge)
Sets or merges the digest.
|
void |
setDiscardDeliveredMsgs(boolean discard_delivered_msgs) |
void |
setLogDiscardMessages(boolean flag) |
void |
setLogNotFoundMessages(boolean flag) |
void |
setTimer(TimeScheduler timer)
Only used for unit tests, don't use !
|
void |
setUseMcastXmit(boolean use_mcast_xmit) |
void |
setUseMcastXmitReq(boolean flag) |
void |
setXmitFromRandomMember(boolean xmit_from_random_member) |
protected static long |
sizeOfAllMessages(Table<Message> buf,
boolean include_headers) |
protected void |
stable(Digest digest)
Garbage collect messages that have been seen by all members.
|
void |
start()
This method is called on a
Channel.connect(String) . |
protected void |
startRetransmitTask() |
void |
stop()
This method is called on a
Channel.disconnect() . |
protected void |
stopRetransmitTask() |
java.lang.String[] |
supportedKeys()
Returns a list of supported keys
|
void |
triggerXmit() |
protected void |
unknownMember(Address sender,
java.lang.Object message) |
java.lang.Object |
up(Event evt)
Callback.
|
void |
up(MessageBatch batch)
Sends up a multiple messages in a
MessageBatch . |
accept, destroy, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, providedDownServices, requiredDownServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, setValues, statsEnabled
protected static final int NUM_REBROADCAST_MSGS
protected int max_msg_batch_size
protected boolean use_mcast_xmit
protected boolean use_mcast_xmit_req
protected boolean xmit_from_random_member
protected boolean discard_delivered_msgs
protected long max_rebroadcast_timeout
@Deprecated protected boolean print_stability_history_on_failed_xmit
protected boolean log_discard_msgs
protected boolean log_not_found_msgs
protected long xmit_interval
protected int xmit_table_num_rows
protected int xmit_table_msgs_per_row
protected double xmit_table_resize_factor
protected long xmit_table_max_compaction_time
protected int become_server_queue_size
protected long suppress_time_non_member_warnings
protected int num_messages_sent
protected int num_messages_received
protected static final Message DUMMY_OOB_MSG
protected final Filter<Message> no_dummy_and_no_oob_delivered_msgs_and_no_dont_loopback_msgs
protected final java.util.concurrent.atomic.AtomicLong xmit_reqs_received
protected final java.util.concurrent.atomic.AtomicLong xmit_reqs_sent
protected final java.util.concurrent.atomic.AtomicLong xmit_rsps_received
protected final java.util.concurrent.atomic.AtomicLong xmit_rsps_sent
protected volatile boolean is_server
protected Address local_addr
protected volatile java.util.List<Address> members
protected volatile View view
protected final java.util.concurrent.ConcurrentMap<Address,Table<Message>> xmit_table
protected java.util.concurrent.Future<?> xmit_task
protected final java.util.Map<Address,java.lang.Long> xmit_task_map
protected volatile boolean leaving
protected volatile boolean running
protected TimeScheduler timer
protected final java.util.concurrent.locks.Lock rebroadcast_lock
protected final java.util.concurrent.locks.Condition rebroadcast_done
protected volatile boolean rebroadcasting
protected final java.util.concurrent.locks.Lock rebroadcast_digest_lock
protected Digest rebroadcast_digest
protected final BoundedList<java.lang.String> stability_msgs
protected final BoundedList<java.lang.String> digest_history
protected BoundedList<Message> become_server_queue
protected SuppressLog<Address> suppress_log_non_member
public boolean isXmitTaskRunning()
public int getNonMemberMessages()
public void clearNonMemberCache()
public long getXmitRequestsReceived()
public long getXmitRequestsSent()
public long getXmitResponsesReceived()
public long getXmitResponsesSent()
public boolean isUseMcastXmit()
public boolean isXmitFromRandomMember()
public boolean isDiscardDeliveredMsgs()
public boolean getLogDiscardMessages()
public void setUseMcastXmit(boolean use_mcast_xmit)
public void setUseMcastXmitReq(boolean flag)
public void setLogDiscardMessages(boolean flag)
public void setLogNotFoundMessages(boolean flag)
public void setXmitFromRandomMember(boolean xmit_from_random_member)
public void setDiscardDeliveredMsgs(boolean discard_delivered_msgs)
public int getBecomeServerQueueSizeActual()
public Table<Message> getWindow(Address sender)
public void setTimer(TimeScheduler timer)
public int getXmitTableUndeliveredMsgs()
public int getXmitTableMissingMessages()
public long getXmitTableCapacity()
public int getXmitTableNumCurrentRows()
public long getSizeOfAllMessages()
public long getSizeOfAllMessagesInclHeaders()
public int getXmitTableNumCompactions()
public int getXmitTableNumMoves()
public int getXmitTableNumResizes()
public int getXmitTableNumPurges()
public java.lang.String printMessages()
public long getCurrentSeqno()
public java.lang.String printStabilityMessages()
public java.lang.String printDigestHistory()
public void compact()
public java.lang.String dumpXmitTablesNumCurrentRows()
public void resetStats()
resetStats
在类中 Protocol
public void init() throws java.lang.Exception
Protocol
public java.util.Map<java.lang.String,java.lang.Object> dumpStats()
public java.lang.String printStats()
printStats
在类中 Protocol
public java.lang.String printStabilityHistory()
public java.util.List<java.lang.Integer> providedUpServices()
Protocol
providedUpServices
在类中 Protocol
public void start() throws java.lang.Exception
Protocol
Channel.connect(String)
. Starts work.
Protocols are connected and queues are ready to receive events.
Will be called from bottom to top. This call will replace
the START and START_OK events.start
在类中 Protocol
java.lang.Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
to fail, so Channel.connect(String)
will throw an exceptionpublic void stop()
Protocol
Channel.disconnect()
. Stops work (e.g. by closing multicast socket).
Will be called from top to bottom. This means that at the time of the method invocation the
neighbor protocol below is still working. This method will replace the
STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that
when this method is called all messages in the down queue will have been flushedpublic java.lang.Object down(Event evt)
Do not use down_prot.down()
in this
method as the event is passed down by default by the superclass after this method returns !
public java.lang.Object up(Event evt)
Do not use PassUp
in this
method as the event is passed up by default by the superclass after this method returns !
public void up(MessageBatch batch)
Protocol
MessageBatch
. The sender of the batch is always the same, and so is the
destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed
messages, although the transport itself will create initial MessageBatches that contain only either OOB or
regular messages.
The default processing below sends messages up the stack individually, based on a matching criteria
(calling Protocol.accept(Message)
), and - if true - calls Protocol.up(Event)
for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.
Subclasses should check if there are any messages destined for them (e.g. using
MessageBatch.getMatchingMessages(short, boolean)
), then possibly remove and process them and finally pass
the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all
encrypted messages in the batch, not remove them, and pass the batch up when done.public java.util.Map<java.lang.String,java.lang.String> handleProbe(java.lang.String... keys)
DiagnosticsHandler.ProbeHandler
handleProbe
在接口中 DiagnosticsHandler.ProbeHandler
public java.lang.String[] supportedKeys()
DiagnosticsHandler.ProbeHandler
supportedKeys
在接口中 DiagnosticsHandler.ProbeHandler
protected void queueMessage(Message msg, long seqno)
protected void unknownMember(Address sender, java.lang.Object message)
protected void send(Event evt, Message msg)
protected void handleMessage(Message msg, NakAckHeader2 hdr)
protected void handleMessages(Address dest, Address sender, java.util.List<Tuple<java.lang.Long,Message>> msgs, boolean oob, AsciiString cluster_name)
protected void removeAndPassUp(Table<Message> buf, Address sender, boolean loopback, AsciiString cluster_name)
protected void handleXmitReq(Address xmit_requester, SeqnoList missing_msgs, Address original_sender)
xmit_requester
- The sender of the XMIT_REQ, we have to send the requested copy of the message to this addressmissing_msgs
- A list of seqnos that have to be retransmittedoriginal_sender
- The member who originally sent the messsage. Guaranteed to be non-nullprotected void deliverBatch(MessageBatch batch)
protected void flushBecomeServerQueue()
ClientGmsImpl.installView(View, Digest)
method (called when a view is installed).protected void cancelRebroadcasting()
protected void sendXmitRsp(Address dest, Message msg)
dest
- msg
- protected void handleXmitRsp(Message msg, NakAckHeader2 hdr)
protected Message msgFromXmitRsp(Message msg, NakAckHeader2 hdr)
protected void rebroadcastMessages()
protected void checkForRebroadcasts()
protected static boolean isGreaterThanOrEqual(Digest first, Digest other)
protected void adjustReceivers(java.util.List<Address> members)
public Digest getDigest()
protected void setDigest(Digest digest)
protected void mergeDigest(Digest digest)
protected void overwriteDigest(Digest digest)
digest
- protected void setDigest(Digest digest, boolean merge)
digest
- The digestmerge
- Whether to merge the new digest with our own, or notprotected void stable(Digest digest)
protected void retransmit(long first_seqno, long last_seqno, Address sender)
protected void retransmit(long first_seqno, long last_seqno, Address sender, boolean multicast_xmit_request)
protected void retransmit(SeqnoList missing_msgs, Address sender, boolean multicast_xmit_request)
protected void reset()
protected static long sizeOfAllMessages(Table<Message> buf, boolean include_headers)
protected void startRetransmitTask()
protected void stopRetransmitTask()
public void triggerXmit()