public class DefaultBroadcaster extends Object implements Broadcaster
Broadcaster
implementation.
Broadcast messages to suspended responses using the caller's Thread.
This basic Broadcaster
use an ExecutorService
to broadcast messages, hence the broadcast operation is asynchronous. Make sure
you block on broadcast(Object)
.get()} if you need synchronous operations.Modifier and Type | Class and Description |
---|---|
protected static class |
DefaultBroadcaster.AsyncWriteToken |
static class |
DefaultBroadcaster.WriteQueue |
Broadcaster.POLICY, Broadcaster.SCOPE
ROOT_MASTER
Constructor and Description |
---|
DefaultBroadcaster() |
public static final int POLLING_DEFAULT
public static final String CACHED
protected final ConcurrentLinkedQueue<AtmosphereResource> resources
protected BroadcasterConfig bc
protected final BlockingQueue<Deliver> messages
protected Collection<BroadcasterListener> broadcasterListeners
protected final AtomicBoolean started
protected final AtomicBoolean initialized
protected final AtomicBoolean destroyed
protected Broadcaster.SCOPE scope
protected String name
protected final ConcurrentLinkedQueue<Deliver> delayedBroadcast
protected final ConcurrentLinkedQueue<Deliver> broadcastOnResume
protected final ConcurrentLinkedQueue<BroadcasterLifeCyclePolicyListener> lifeCycleListeners
protected final ConcurrentHashMap<String,DefaultBroadcaster.WriteQueue> writeQueues
protected final DefaultBroadcaster.WriteQueue uniqueWriteQueue
protected final AtomicInteger dispatchThread
protected Future<?>[] notifierFuture
protected Future<?>[] asyncWriteFuture
protected URI uri
protected AtmosphereConfig config
protected int writeTimeoutInSecond
protected int waitTime
protected boolean sharedListeners
protected boolean candidateForPoolable
protected final String usingTokenIdForAttribute
public Broadcaster initialize(String name, URI uri, AtmosphereConfig config)
Broadcaster
initialize
in interface Broadcaster
config
- an AtmosphereConfig
public Broadcaster initialize(String name, AtmosphereConfig config)
protected BroadcasterConfig createBroadcasterConfig(AtmosphereConfig config)
BroadcasterConfig
.config
- the AtmosphereConfig
BroadcasterConfig
public void destroy()
Broadcaster
ExecutorService
.destroy
in interface Broadcaster
public Collection<AtmosphereResource> getAtmosphereResources()
Broadcaster
List
of AtmosphereResource
.getAtmosphereResources
in interface Broadcaster
List
of AtmosphereResource
associated with this Broadcaster
.Broadcaster.addAtmosphereResource(AtmosphereResource)
public void setScope(Broadcaster.SCOPE scope)
Broadcaster
setScope
in interface Broadcaster
scope
- Broadcaster.SCOPE
to set.public Broadcaster.SCOPE getScope()
Broadcaster
Broadcaster.SCOPE
.getScope
in interface Broadcaster
Broadcaster.SCOPE
of Broadcaster
.public void setID(String id)
Broadcaster
Broadcaster
.setID
in interface Broadcaster
id
- ID of this Broadcaster
public Broadcaster rename(String id)
BroadcasterFactory
. This
method must be carefully used as it could easily create memory leak as the Broadcaster won't be removed
from its BroadcasterFactory
.id
- the new namepublic String getID()
Broadcaster
Broadcaster
.getID
in interface Broadcaster
Broadcaster
public void resumeAll()
Broadcaster
AtmosphereResource
) added via
Broadcaster.addAtmosphereResource(org.atmosphere.cpr.AtmosphereResource)
.resumeAll
in interface Broadcaster
public void releaseExternalResources()
Broadcaster
AtmosphereResource
and some resource (like database connections, JMS queue, etc.) needs to be
closed.releaseExternalResources
in interface Broadcaster
public void setBroadcasterLifeCyclePolicy(BroadcasterLifeCyclePolicy lifeCyclePolicy)
Broadcaster
BroadcasterLifeCyclePolicy
. Make sure you are selecting the right policy
to avoid unexpected situations.setBroadcasterLifeCyclePolicy
in interface Broadcaster
lifeCyclePolicy
- a BroadcasterLifeCyclePolicy
public BroadcasterLifeCyclePolicy getBroadcasterLifeCyclePolicy()
Broadcaster
BroadcasterLifeCyclePolicy
.getBroadcasterLifeCyclePolicy
in interface Broadcaster
BroadcasterLifeCyclePolicy
public void addBroadcasterLifeCyclePolicyListener(BroadcasterLifeCyclePolicyListener b)
Broadcaster
addBroadcasterLifeCyclePolicyListener
in interface Broadcaster
b
- a BroadcasterLifeCyclePolicyListener
public void removeBroadcasterLifeCyclePolicyListener(BroadcasterLifeCyclePolicyListener b)
Broadcaster
BroadcasterLifeCyclePolicyListener
.removeBroadcasterLifeCyclePolicyListener
in interface Broadcaster
b
- a BroadcasterLifeCyclePolicyListener
public boolean isDestroyed()
Broadcaster
Broadcaster
has been destroyed.isDestroyed
in interface Broadcaster
Broadcaster
has been destroyedpublic Future<Object> awaitAndBroadcast(Object t, long time, TimeUnit timeUnit)
Broadcaster
AtmosphereResource
before broadcasting. This method will block until
Broadcaster.addAtmosphereResource(AtmosphereResource)
gets invoked.awaitAndBroadcast
in interface Broadcaster
public Broadcaster addBroadcasterListener(BroadcasterListener b)
Broadcaster
BroadcasterListener
.addBroadcasterListener
in interface Broadcaster
b
- a BroadcasterListener
public Broadcaster removeBroadcasterListener(BroadcasterListener b)
Broadcaster
BroadcasterListener
.removeBroadcasterListener
in interface Broadcaster
b
- a BroadcasterListener
protected Runnable getBroadcastHandler()
protected Runnable getAsyncWriteHandler(DefaultBroadcaster.WriteQueue writeQueue)
protected void start()
protected void spawnReactor()
protected void killReactiveThreads()
protected int reactiveThreadsCount()
protected void push(Deliver deliver)
protected void deliverPush(Deliver deliver, boolean rec)
protected boolean endBroadcast(Deliver deliver, AtmosphereResource r, CacheMessage cacheMsg, boolean deliverMessage)
protected void queueWriteIO(AtmosphereResource r, Deliver deliver, AtomicInteger count) throws InterruptedException
InterruptedException
protected void executeBlockingWrite(AtmosphereResource r, Deliver deliver, AtomicInteger count) throws InterruptedException
InterruptedException
protected boolean perRequestFilter(AtmosphereResource r, Deliver msg)
protected void executeAsyncWrite(DefaultBroadcaster.AsyncWriteToken token)
protected boolean cacheMessageOnIOException(Throwable cause)
protected boolean checkCachedAndPush(AtmosphereResource r, AtmosphereResourceEvent e)
protected boolean retrieveTrackedBroadcast(AtmosphereResource r, AtmosphereResourceEvent e)
protected void invokeOnStateChange(AtmosphereResource r, AtmosphereResourceEvent e)
protected void prepareInvokeOnStateChange(AtmosphereResource r, AtmosphereResourceEvent e)
public void onException(Throwable t, AtmosphereResource ar)
public void onException(Throwable t, AtmosphereResource ar, boolean notifyAndCache)
public void cacheLostMessage(AtmosphereResource r, boolean force)
r
- AtmosphereResource
public void cacheLostMessage(AtmosphereResource r, DefaultBroadcaster.AsyncWriteToken token)
r
- AtmosphereResource
public void cacheLostMessage(AtmosphereResource r, DefaultBroadcaster.AsyncWriteToken token, boolean force)
r
- AtmosphereResource
public void setSuspendPolicy(long maxSuspendResource, Broadcaster.POLICY policy)
Broadcaster
AtmosphereResource
s. If the maximum is reached, Atmosphere will either
resume AtmosphereResource
using Broadcaster.POLICY.FIFO
(first in first out)
or Broadcaster.POLICY.REJECT
the AtmosphereResource
.
By default the number is unlimited.setSuspendPolicy
in interface Broadcaster
maxSuspendResource
- maximum number of suspended AtmosphereResourcepublic Future<Object> broadcast(Object msg)
Broadcaster
Object
to all suspended responses, eg. invoke AtmosphereHandler.onStateChange(org.atmosphere.cpr.AtmosphereResourceEvent)
.broadcast
in interface Broadcaster
msg
- the Object
to be broadcastedFuture
that can be used to synchronize using the Future.get()
protected BroadcasterFuture<Object> futureDone(Object msg)
protected void dispatchMessages(Deliver e)
protected Object filter(Object msg)
BroadcastFilter
msg
- public Future<Object> broadcast(Object msg, AtmosphereResource r)
Broadcaster
Object
to all suspended responses, eg. invoke
AtmosphereHandler.onStateChange(org.atmosphere.cpr.AtmosphereResourceEvent)
with an instance of AtmosphereResource
, representing
a single suspended response.broadcast
in interface Broadcaster
msg
- the Object
to be broadcastedr
- an AtmosphereResource
Future
that can be used to synchronize using the Future.get()
public Future<Object> broadcastOnResume(Object msg)
Broadcaster
Object
when an AtmosphereResource
is resumed by a timeout or when using
AtmosphereResource.resume()
.broadcastOnResume
in interface Broadcaster
msg
- the Object
to be broadcastedFuture
that can be used to synchronize using the Future.get()
protected void broadcastOnResume(AtmosphereResource r)
public Future<Object> broadcast(Object msg, Set<AtmosphereResource> subset)
Broadcaster
Object
to all suspended response, eg. invoke
AtmosphereHandler.onStateChange(org.atmosphere.cpr.AtmosphereResourceEvent)
with a Set
of AtmosphereResource
,
representing a set of AtmosphereHandler
.broadcast
in interface Broadcaster
msg
- the Object
to be broadcastedsubset
- a Set of AtmosphereResource
Future
that can be used to synchronize using the Future.get()
public Broadcaster addAtmosphereResource(AtmosphereResource r)
Broadcaster
AtmosphereResource
to the list of items to be notified when
the Broadcaster.broadcast(java.lang.Object)
is invoked.
It is strongly recommended to suspend the AtmosphereResource
before
adding it to a Broadcaster
.addAtmosphereResource
in interface Broadcaster
r
- an AtmosphereResource
protected void cacheAndSuspend(AtmosphereResource r)
r
- AtmosphereResourceprotected void notifyAndAdd(AtmosphereResource r)
protected void entryDone(BroadcasterFuture<?> f)
protected void notifyBroadcastListener()
protected void notifyOnAddAtmosphereResourceListener(AtmosphereResource r)
protected void notifyOnRemoveAtmosphereResourceListener(AtmosphereResource r)
protected void notifyOnMessage(Deliver deliver)
public Broadcaster removeAtmosphereResource(AtmosphereResource r)
Broadcaster
AtmosphereResource
from the list of s to be notified when
the Broadcaster.broadcast(java.lang.Object)
is invoked.removeAtmosphereResource
in interface Broadcaster
r
- an AtmosphereResource
protected Broadcaster removeAtmosphereResource(AtmosphereResource r, boolean executeDone)
public void setBroadcasterConfig(BroadcasterConfig bc)
Broadcaster
BroadcasterConfig
instance.setBroadcasterConfig
in interface Broadcaster
bc
- Configuration to be set.public BroadcasterConfig getBroadcasterConfig()
Broadcaster
BroadcasterConfig
.getBroadcasterConfig
in interface Broadcaster
BroadcasterConfig
public Future<Object> delayBroadcast(Object o)
Broadcaster
Object
will be broadcasted when Broadcaster.broadcast(java.lang.Object)
is invoked the first time.delayBroadcast
in interface Broadcaster
o
- the Object
to be broadcastedFuture
that can be used to synchronize using the Future.get()
public Future<Object> delayBroadcast(Object o, long delay, TimeUnit t)
Broadcaster
Object
will be broadcasted once the
specified delay expires or when Broadcaster.broadcast(java.lang.Object)
is invoked the first time.delayBroadcast
in interface Broadcaster
o
- the Object
to be broadcasteddelay
- Amount of time to delayt
- the TimeUnit
of the delayFuture
that can be used to synchronize using the Future.get()
public Future<Object> scheduleFixedBroadcast(Object o, long period, TimeUnit t)
Broadcaster
Object
will be broadcasted after every period
specified time frame expires. If the TimeUnit
is set to null, the
Object
will be broadcasted when the first Broadcaster.broadcast(java.lang.Object)
is invoked.scheduleFixedBroadcast
in interface Broadcaster
o
- the Object
to be broadcastedperiod
- Every so often broadcast.t
- a TimeUnit
of period.Future
that can be used to synchronize using the Future.get()
public Future<Object> scheduleFixedBroadcast(Object o, long waitFor, long period, TimeUnit t)
Broadcaster
Object
will be broadcasted after every period
specified time frame expires. If the TimeUnit
is set null, the
Object
will be broadcasted when the first Broadcaster.broadcast(java.lang.Object)
is invoked.scheduleFixedBroadcast
in interface Broadcaster
o
- the Object
to be broadcastedwaitFor
- Wait for that long before first broadcastperiod
- The period inbetween broadcast.t
- a TimeUnit
of waitFor and period.Future
that can be used to synchronize using the Future.get()
public Collection<BroadcasterListener> broadcasterListeners()
public BroadcasterLifeCyclePolicy lifeCyclePolicy()
public ConcurrentLinkedQueue<BroadcasterLifeCyclePolicyListener> lifeCycleListeners()
public BlockingQueue<Deliver> messages()
public ConcurrentHashMap<String,DefaultBroadcaster.WriteQueue> writeQueues()
public Broadcaster.POLICY policy()
public boolean outOfOrderBroadcastSupported()
public AtomicBoolean recentActivity()
public LifecycleHandler lifecycleHandler()
public DefaultBroadcaster lifecycleHandler(LifecycleHandler lifecycleHandler)
public Future<?> currentLifecycleTask()
public DefaultBroadcaster currentLifecycleTask(Future<?> currentLifecycleTask)
Copyright © 2016. All rights reserved.