public class STABLE extends Protocol
Works as follows: periodically (desired_avg_gossip) or when having received a number of bytes (max_bytes), every member sends its digest (highest seqno delivered, received) to the cluster (send_stable_msgs_to_coord_only=false) or the current coordinator (send_stable_msgs_to_coord_only=true).
The recipient updates a stability vector, which maintains the highest seqno delivered/receive for each member and initially contains no data, when such a message is received. When messages from all members have been received, a stability message is mcast, which causes all members to send a STABLE event down the stack (triggering garbage collection in the NAKACK{2,3} layer).When send_stable_msgs_to_coord_only is true, far fewer messages are exchanged, as members don't multicast STABLE messages, but instead send them only to the coordinator.
限定符和类型 | 类和说明 |
---|---|
protected class |
STABLE.ResumeTask |
protected class |
STABLE.StabilitySendTask
Multicasts a STABILITY message
|
static class |
STABLE.StableHeader |
protected class |
STABLE.StableTask
Mcast periodic STABLE message.
|
限定符和类型 | 字段和说明 |
---|---|
protected double |
cap
已过时。
|
protected Address |
coordinator |
protected long |
desired_avg_gossip
Sends a STABLE gossip every 20 seconds on average. 0 disables gossiping of STABLE messages
|
protected MutableDigest |
digest |
protected boolean |
initialized |
protected Address |
local_addr |
protected java.util.concurrent.locks.Lock |
lock |
protected long |
max_bytes
Total amount of bytes from incoming messages (default = 0 = disabled).
|
protected static long |
MAX_SUSPEND_TIME |
protected long |
num_bytes_received
The total number of bytes received from unicast and multicast messages
|
protected int |
num_stability_msgs_received |
protected int |
num_stability_msgs_sent |
protected int |
num_stable_msgs_received |
protected int |
num_stable_msgs_sent |
protected java.util.concurrent.locks.Lock |
received |
protected java.util.concurrent.Future<?> |
resume_task_future |
protected java.lang.Object |
resume_task_mutex |
protected boolean |
send_stable_msgs_to_coord_only |
protected long |
stability_delay
delay before we send STABILITY msg (give others a change to send first).
|
protected java.util.concurrent.locks.Lock |
stability_lock |
protected java.util.concurrent.Future<?> |
stability_task_future |
protected java.util.concurrent.Future<?> |
stable_task_future |
protected java.util.concurrent.locks.Lock |
stable_task_lock |
protected boolean |
suspended
When true, don't take part in garbage collection: neither send STABLE messages nor handle STABILITY messages
|
protected TimeScheduler |
timer |
protected View |
view |
protected FixedSizeBitSet |
votes
Keeps track of who we already heard from (STABLE_GOSSIP msgs).
|
构造器和说明 |
---|
STABLE() |
限定符和类型 | 方法和说明 |
---|---|
protected boolean |
addVote(int rank)
Adds mbr to votes and returns true if we have all the votes, otherwise false.
|
protected static boolean |
allVotesReceived(FixedSizeBitSet votes)
Votes is already locked and guaranteed to be non-null
|
java.lang.Object |
down(Event evt)
An event is to be sent down the stack.
|
void |
gc() |
long |
getBytes() |
long |
getDesiredAverageGossip() |
protected Digest |
getDigest() |
long |
getMaxBytes() |
protected static int |
getRank(Address member,
View v) |
int |
getStabilityReceived() |
int |
getStabilitySent() |
int |
getStableReceived() |
int |
getStableSent() |
boolean |
getStableTaskRunning() |
protected void |
handleRegularMessage(Message msg) |
protected void |
handleStabilityMessage(Digest stable_digest,
Address sender,
ViewId view_id) |
protected void |
handleStableMessage(Digest d,
Address sender,
ViewId view_id)
Digest d contains (a) the highest seqnos deliverable for each sender and (b) the highest seqnos
seen for each member.
|
protected void |
handleUpEvent(STABLE.StableHeader hdr,
Address sender,
Digest digest) |
protected void |
handleViewChange(View v) |
void |
init()
Called after instance has been created (null constructor) and before protocol is started.
|
static Buffer |
marshal(Digest digest) |
protected java.lang.String |
printDigest(Digest digest) |
protected Digest |
readDigest(byte[] buffer,
int offset,
int length) |
java.util.List<java.lang.Integer> |
requiredDownServices()
List of events that are required to be answered by some layer below
|
protected void |
resetDigest() |
protected void |
resetNumBytes() |
void |
resetStats() |
protected void |
resume() |
protected void |
sendStabilityMessage(Digest tmp,
ViewId view_id)
Schedules a stability message to be mcast after a random number of milliseconds (range 1-5 secs).
|
protected void |
sendStableMessage(boolean send_in_background)
Broadcasts a STABLE message of the current digest to all members (or the coordinator only).
|
void |
setDesiredAverageGossip(long gossip_interval) |
void |
setMaxBytes(long max_bytes) |
void |
start()
This method is called on a
Channel.connect(String) . |
protected void |
startResumeTask(long max_suspend_time) |
protected void |
startStabilityTask(Digest d,
ViewId view_id,
long delay) |
protected void |
startStableTask() |
void |
stop()
This method is called on a
Channel.disconnect() . |
protected void |
stopResumeTask() |
protected void |
stopStabilityTask() |
protected void |
stopStableTask() |
protected void |
suspend(long timeout) |
java.lang.Object |
up(Event evt)
An event was received from the layer below.
|
void |
up(MessageBatch batch)
Sends up a multiple messages in a
MessageBatch . |
protected void |
updateLocalDigest(Digest d,
Address sender)
Update my own digest from a digest received by somebody else.
|
accept, destroy, dumpStats, enableStats, getConfigurableObjects, getDownProtocol, getDownServices, getId, getIdsAbove, getLevel, getName, getProtocolStack, getSocketFactory, getThreadFactory, getTransport, getUpProtocol, getUpServices, getValue, isErgonomics, level, parse, printStats, providedDownServices, providedUpServices, requiredUpServices, resetStatistics, setDownProtocol, setErgonomics, setId, setLevel, setProtocolStack, setSocketFactory, setUpProtocol, setValue, setValues, statsEnabled
protected static final long MAX_SUSPEND_TIME
protected long desired_avg_gossip
protected long stability_delay
max_bytes
is usedprotected long max_bytes
num_bytes_received
reset to 0 . If this is > 0, then
ideally stability_delay
should be set to a low number as
well@Deprecated protected double cap
protected boolean send_stable_msgs_to_coord_only
protected int num_stable_msgs_sent
protected int num_stable_msgs_received
protected int num_stability_msgs_sent
protected int num_stability_msgs_received
protected Address local_addr
protected volatile View view
protected volatile MutableDigest digest
protected FixedSizeBitSet votes
protected final java.util.concurrent.locks.Lock lock
protected java.util.concurrent.Future<?> stability_task_future
protected final java.util.concurrent.locks.Lock stability_lock
protected java.util.concurrent.Future<?> stable_task_future
protected final java.util.concurrent.locks.Lock stable_task_lock
protected TimeScheduler timer
protected long num_bytes_received
protected final java.util.concurrent.locks.Lock received
protected volatile boolean suspended
protected boolean initialized
protected java.util.concurrent.Future<?> resume_task_future
protected final java.lang.Object resume_task_mutex
protected volatile Address coordinator
public long getDesiredAverageGossip()
public void setDesiredAverageGossip(long gossip_interval)
public long getMaxBytes()
public void setMaxBytes(long max_bytes)
public long getBytes()
public int getStableSent()
public int getStableReceived()
public int getStabilitySent()
public int getStabilityReceived()
public boolean getStableTaskRunning()
public void resetStats()
resetStats
在类中 Protocol
public java.util.List<java.lang.Integer> requiredDownServices()
Protocol
requiredDownServices
在类中 Protocol
protected void suspend(long timeout)
protected void resume()
public void init() throws java.lang.Exception
Protocol
public void start() throws java.lang.Exception
Protocol
Channel.connect(String)
. Starts work.
Protocols are connected and queues are ready to receive events.
Will be called from bottom to top. This call will replace
the START and START_OK events.start
在类中 Protocol
java.lang.Exception
- Thrown if protocol cannot be started successfully. This will cause the ProtocolStack
to fail, so Channel.connect(String)
will throw an exceptionpublic void stop()
Protocol
Channel.disconnect()
. Stops work (e.g. by closing multicast socket).
Will be called from top to bottom. This means that at the time of the method invocation the
neighbor protocol below is still working. This method will replace the
STOP, STOP_OK, CLEANUP and CLEANUP_OK events. The ProtocolStack guarantees that
when this method is called all messages in the down queue will have been flushedpublic java.lang.Object up(Event evt)
Protocol
down_prot.down()
or c) the event (or another event) is sent up
the stack using up_prot.up()
.protected void handleUpEvent(STABLE.StableHeader hdr, Address sender, Digest digest)
public void up(MessageBatch batch)
Protocol
MessageBatch
. The sender of the batch is always the same, and so is the
destination (null == multicast messages). Messages in a batch can be OOB messages, regular messages, or mixed
messages, although the transport itself will create initial MessageBatches that contain only either OOB or
regular messages.
The default processing below sends messages up the stack individually, based on a matching criteria
(calling Protocol.accept(Message)
), and - if true - calls Protocol.up(Event)
for that message and removes the message. If the batch is not empty, it is passed up, or else it is dropped.
Subclasses should check if there are any messages destined for them (e.g. using
MessageBatch.getMatchingMessages(short, boolean)
), then possibly remove and process them and finally pass
the batch up to the next protocol. Protocols can also modify messages in place, e.g. ENCRYPT could decrypt all
encrypted messages in the batch, not remove them, and pass the batch up when done.protected void handleRegularMessage(Message msg)
public java.lang.Object down(Event evt)
Protocol
down_prot.down()
. In case of a GET_ADDRESS event (which tries to
retrieve the stack's address from one of the bottom layers), the layer may need to send
a new response event back up the stack using up_prot.up()
.public void gc()
protected void handleViewChange(View v)
protected void updateLocalDigest(Digest d, Address sender)
protected void resetDigest()
protected boolean addVote(int rank)
rank
- protected static boolean allVotesReceived(FixedSizeBitSet votes)
protected void startStableTask()
protected void stopStableTask()
protected void startResumeTask(long max_suspend_time)
protected void stopResumeTask()
protected void stopStabilityTask()
protected void handleStableMessage(Digest d, Address sender, ViewId view_id)
protected void resetNumBytes()
protected void handleStabilityMessage(Digest stable_digest, Address sender, ViewId view_id)
protected void sendStableMessage(boolean send_in_background)
protected Digest readDigest(byte[] buffer, int offset, int length)
protected void sendStabilityMessage(Digest tmp, ViewId view_id)
tmp
- A copy of te stability digest, so we don't need to copy it againprotected Digest getDigest()
protected java.lang.String printDigest(Digest digest)