class{ privatestaticfinalint MAX_AVAILABLE = 100; privatefinal Semaphore available = new Semaphore(MAX_AVAILABLE, true);
public Object getItem()throws InterruptedException { available.acquire(); return getNextAvailableItem(); }
publicvoidputItem(Object x){ if (markAsUnused(x)) available.release(); }
protected Object[] items = new Object[]{};//... whatever kinds of items being managed.存放ok protectedboolean[] used = newboolean[MAX_AVAILABLE];
protectedsynchronized Object getNextAvailableItem(){ for (int i = 0; i < MAX_AVAILABLE; ++i) { if (!used[i]) { used[i] = true; return items[i]; } } returnnull; // not reached }
protectedsynchronizedbooleanmarkAsUnused(Object item){ for (int i = 0; i < MAX_AVAILABLE; ++i) { if (item == items[i]) { if (used[i]) { used[i] = false; returntrue; } else returnfalse; } } returnfalse; } }
Before obtaining an item each thread must acquire a permit from the semaphore, guaranteeing that an item is available for use.
When the thread has finished with the item it is returned back to the pool and a permit is returned to the semaphore, allowing another thread to acquire that item.
Note that no synchronization lock is held when {@link #acquire} is calledas that would prevent an item from being returned to the pool.
The semaphore encapsulates the synchronization needed to restrict access to the pool, separately from any synchronization needed to maintain the consistency of the pool itself.
publicclassSemaphoreimplementsjava.io.Serializable{ /** All mechanics via AbstractQueuedSynchronizer subclass */ privatefinal Sync sync; /** * Synchronization implementation for semaphore. * Uses AQS state to represent permits. * Subclassed into fair and nonfair versions. */ abstractstaticclassSyncextendsAbstractQueuedSynchronizer{ Sync(int permits) { setState(permits); }
finalintgetPermits(){ return getState(); }
finalintnonfairTryAcquireShared(int acquires){ for (;;) { int available = getState(); int remaining = available - acquires; if (remaining < 0 ||//state means resource. remaining<0 will wait and enq AQS队列 compareAndSetState(available, remaining))//获取锁 return remaining; } }
protectedfinalbooleantryReleaseShared(int releases){ for (;;) { int current = getState(); int next = current + releases;//release the resource if (next < current) // overflow 有符号数越界 thrownew Error("Maximum permit count exceeded"); if (compareAndSetState(current, next)) returntrue; } }
/////////////////////////////////////////////////////////////////////// finalvoidreducePermits(int reductions){ for (;;) { int current = getState(); int next = current - reductions; if (next > current) // underflow thrownew Error("Permit count underflow"); if (compareAndSetState(current, next)) return; } } finalintdrainPermits(){ for (;;) { int current = getState(); if (current == 0 || compareAndSetState(current, 0)) return current; } } }
protectedinttryAcquireShared(int acquires){ for (;;) { if (hasQueuedPredecessors())//存在前驱节点 return -1; int available = getState(); int remaining = available - acquires; if (remaining < 0 ||//remaining<0 等待 compareAndSetState(available, remaining)) return remaining; } } }
/** * Acquires and returns all permits that are immediately available. * @return the number of permits acquired */ publicintdrainPermits(){ return sync.drainPermits(); }
/** * Shrinks the number of available permits by the indicated reduction. * This method can be useful in subclasses that use semaphores to track resources that become unavailable. * This method differs from {@code acquire} in that it does not block waiting for permits to become available. * @param reduction the number of permits to remove * @throws IllegalArgumentException if {@code reduction} is negative */ protectedvoidreducePermits(int reduction){ if (reduction < 0) thrownew IllegalArgumentException(); sync.reducePermits(reduction); } }
近期评论