(e));
+ ++n;
+ }
+ count.set(n);
+ } finally {
+ putLock.unlock();
+ }
+ }
+
+ // this doc comment is overridden to remove the reference to collections
+ // greater in size than Integer.MAX_VALUE
+
+ /**
+ * Returns the number of elements in this queue.
+ *
+ * @return the number of elements in this queue
+ */
+ public int size() {
+ return count.get();
+ }
+
+ // this doc comment is a modified copy of the inherited doc comment,
+ // without the reference to unlimited queues.
+
+ /**
+ * Returns the number of additional elements that this queue can ideally
+ * (in the absence of memory or resource constraints) accept without
+ * blocking. This is always equal to the initial capacity of this queue
+ * less the current {@code size} of this queue.
+ *
+ *
Note that you cannot always tell if an attempt to insert
+ * an element will succeed by inspecting {@code remainingCapacity}
+ * because it may be the case that another thread is about to
+ * insert or remove an element.
+ */
+ public int remainingCapacity() {
+ return capacity - count.get();
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue, waiting if
+ * necessary for space to become available.
+ *
+ * @throws InterruptedException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public void put(E e) throws InterruptedException {
+ if (e == null) throw new NullPointerException();
+ // Note: convention in all put/take/etc is to preset local var
+ // holding count negative to indicate failure unless set.
+ int c = -1;
+ Node node = new Node(e);
+ final ReentrantLock putLock = this.putLock;
+ final AtomicInteger count = this.count;
+ putLock.lockInterruptibly();
+ try {
+ /*
+ * Note that count is used in wait guard even though it is
+ * not protected by lock. This works because count can
+ * only decrease at this point (all other puts are shut
+ * out by lock), and we (or some other waiting put) are
+ * signalled if it ever changes from capacity. Similarly
+ * for all other uses of count in other wait guards.
+ */
+ while (count.get() == capacity) {
+ notFull.await();
+ }
+ enqueue(node);
+ c = count.getAndIncrement();
+ if (c + 1 < capacity)
+ notFull.signal();
+ } finally {
+ putLock.unlock();
+ }
+ if (c == 0)
+ signalNotEmpty();
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue, waiting if
+ * necessary up to the specified wait time for space to become available.
+ *
+ * @return {@code true} if successful, or {@code false} if
+ * the specified waiting time elapses before space is available
+ * @throws InterruptedException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ */
+ public boolean offer(E e, long timeout, TimeUnit unit)
+ throws InterruptedException {
+
+ if (e == null) throw new NullPointerException();
+ long nanos = unit.toNanos(timeout);
+ int c = -1;
+ final ReentrantLock putLock = this.putLock;
+ final AtomicInteger count = this.count;
+ putLock.lockInterruptibly();
+ try {
+ while (count.get() == capacity) {
+ if (nanos <= 0) {
+ break;
+ }
+ nanos = notFull.awaitNanos(nanos);
+ }
+ enqueue(new Node(e));
+ c = count.getAndIncrement();
+ if (c + 1 < capacity)
+ notFull.signal();
+ } finally {
+ putLock.unlock();
+ }
+ if (c == 0)
+ signalNotEmpty();
+ return true;
+ }
+
+ /**
+ * Inserts the specified element at the tail of this queue if it is
+ * possible to do so immediately without exceeding the queue's capacity,
+ * returning {@code true} upon success and {@code false} if this queue
+ * is full.
+ * When using a capacity-restricted queue, this method is generally
+ * preferable to method {@link BlockingQueue#add add}, which can fail to
+ * insert an element only by throwing an exception.
+ *
+ * @throws NullPointerException if the specified element is null
+ */
+ public boolean offer(E e) {
+ if (e == null) throw new NullPointerException();
+ final AtomicInteger count = this.count;
+ int c = -1;
+ Node node = new Node(e);
+ final ReentrantLock putLock = this.putLock;
+ putLock.lock();
+ try {
+ if (count.get() >= capacity) {
+ takeLock.lock();
+ try {
+ if (count.get() > 0) {
+ E dropOne = dequeue();
+ LOGGER.warn("Drop AlertTask:" + dropOne.toString());
+ c = count.getAndDecrement();
+ }
+ } finally {
+ takeLock.unlock();
+ }
+ }
+ enqueue(node);
+ c = count.getAndIncrement();
+ if (c + 1 < capacity)
+ notFull.signal();
+ } finally {
+ putLock.unlock();
+ }
+ if (c == 0)
+ signalNotEmpty();
+ return c >= 0;
+ }
+
+ public E take() throws InterruptedException {
+ E x;
+ int c = -1;
+ final AtomicInteger count = this.count;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lockInterruptibly();
+ try {
+ while (count.get() == 0) {
+ notEmpty.await();
+ }
+ x = dequeue();
+ c = count.getAndDecrement();
+ if (c > 1)
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
+ }
+ if (c == capacity)
+ signalNotFull();
+ return x;
+ }
+
+ public E poll(long timeout, TimeUnit unit) throws InterruptedException {
+ E x = null;
+ int c = -1;
+ long nanos = unit.toNanos(timeout);
+ final AtomicInteger count = this.count;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lockInterruptibly();
+ try {
+ while (count.get() == 0) {
+ if (nanos <= 0)
+ return null;
+ nanos = notEmpty.awaitNanos(nanos);
+ }
+ x = dequeue();
+ c = count.getAndDecrement();
+ if (c > 1)
+ notEmpty.signal();
+ } finally {
+ takeLock.unlock();
+ }
+ if (c == capacity)
+ signalNotFull();
+ return x;
+ }
+
+ public E poll() {
+ final AtomicInteger count = this.count;
+ if (count.get() == 0)
+ return null;
+ E x = null;
+ int c = -1;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ if (count.get() > 0) {
+ x = dequeue();
+ c = count.getAndDecrement();
+ if (c > 1)
+ notEmpty.signal();
+ }
+ } finally {
+ takeLock.unlock();
+ }
+ if (c == capacity)
+ signalNotFull();
+ return x;
+ }
+
+ public E peek() {
+ if (count.get() == 0)
+ return null;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ Node first = head.next;
+ if (first == null)
+ return null;
+ else
+ return first.item;
+ } finally {
+ takeLock.unlock();
+ }
+ }
+
+ /**
+ * Unlinks interior Node p with predecessor trail.
+ */
+ void unlink(Node p, Node trail) {
+ // assert isFullyLocked();
+ // p.next is not changed, to allow iterators that are
+ // traversing p to maintain their weak-consistency guarantee.
+ p.item = null;
+ trail.next = p.next;
+ if (last == p)
+ last = trail;
+ if (count.getAndDecrement() == capacity)
+ notFull.signal();
+ }
+
+ /**
+ * Removes a single instance of the specified element from this queue,
+ * if it is present. More formally, removes an element {@code e} such
+ * that {@code o.equals(e)}, if this queue contains one or more such
+ * elements.
+ * Returns {@code true} if this queue contained the specified element
+ * (or equivalently, if this queue changed as a result of the call).
+ *
+ * @param o element to be removed from this queue, if present
+ * @return {@code true} if this queue changed as a result of the call
+ */
+ public boolean remove(Object o) {
+ if (o == null) return false;
+ fullyLock();
+ try {
+ for (Node trail = head, p = trail.next;
+ p != null;
+ trail = p, p = p.next) {
+ if (o.equals(p.item)) {
+ unlink(p, trail);
+ return true;
+ }
+ }
+ return false;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Returns {@code true} if this queue contains the specified element.
+ * More formally, returns {@code true} if and only if this queue contains
+ * at least one element {@code e} such that {@code o.equals(e)}.
+ *
+ * @param o object to be checked for containment in this queue
+ * @return {@code true} if this queue contains the specified element
+ */
+ public boolean contains(Object o) {
+ if (o == null) return false;
+ fullyLock();
+ try {
+ for (Node p = head.next; p != null; p = p.next)
+ if (o.equals(p.item))
+ return true;
+ return false;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Returns an array containing all of the elements in this queue, in
+ * proper sequence.
+ *
+ *
The returned array will be "safe" in that no references to it are
+ * maintained by this queue. (In other words, this method must allocate
+ * a new array). The caller is thus free to modify the returned array.
+ *
+ *
This method acts as bridge between array-based and collection-based
+ * APIs.
+ *
+ * @return an array containing all of the elements in this queue
+ */
+ public Object[] toArray() {
+ fullyLock();
+ try {
+ int size = count.get();
+ Object[] a = new Object[size];
+ int k = 0;
+ for (Node p = head.next; p != null; p = p.next)
+ a[k++] = p.item;
+ return a;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Returns an array containing all of the elements in this queue, in
+ * proper sequence; the runtime type of the returned array is that of
+ * the specified array. If the queue fits in the specified array, it
+ * is returned therein. Otherwise, a new array is allocated with the
+ * runtime type of the specified array and the size of this queue.
+ *
+ *
If this queue fits in the specified array with room to spare
+ * (i.e., the array has more elements than this queue), the element in
+ * the array immediately following the end of the queue is set to
+ * {@code null}.
+ *
+ *
Like the {@link #toArray()} method, this method acts as bridge between
+ * array-based and collection-based APIs. Further, this method allows
+ * precise control over the runtime type of the output array, and may,
+ * under certain circumstances, be used to save allocation costs.
+ *
+ *
Suppose {@code x} is a queue known to contain only strings.
+ * The following code can be used to dump the queue into a newly
+ * allocated array of {@code String}:
+ *
+ *
{@code String[] y = x.toArray(new String[0]);}
+ *
+ * Note that {@code toArray(new Object[0])} is identical in function to
+ * {@code toArray()}.
+ *
+ * @param a the array into which the elements of the queue are to
+ * be stored, if it is big enough; otherwise, a new array of the
+ * same runtime type is allocated for this purpose
+ * @return an array containing all of the elements in this queue
+ * @throws ArrayStoreException if the runtime type of the specified array
+ * is not a supertype of the runtime type of every element in
+ * this queue
+ * @throws NullPointerException if the specified array is null
+ */
+ @SuppressWarnings("unchecked")
+ public T[] toArray(T[] a) {
+ fullyLock();
+ try {
+ int size = count.get();
+ if (a.length < size)
+ a = (T[]) java.lang.reflect.Array.newInstance
+ (a.getClass().getComponentType(), size);
+
+ int k = 0;
+ for (Node p = head.next; p != null; p = p.next)
+ a[k++] = (T) p.item;
+ if (a.length > k)
+ a[k] = null;
+ return a;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ public String toString() {
+ fullyLock();
+ try {
+ Node p = head.next;
+ if (p == null)
+ return "[]";
+
+ StringBuilder sb = new StringBuilder();
+ sb.append('[');
+ for (; ; ) {
+ E e = p.item;
+ sb.append(e == this ? "(this Collection)" : e);
+ p = p.next;
+ if (p == null)
+ return sb.append(']').toString();
+ sb.append(',').append(' ');
+ }
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Atomically removes all of the elements from this queue.
+ * The queue will be empty after this call returns.
+ */
+ public void clear() {
+ fullyLock();
+ try {
+ for (Node p, h = head; (p = h.next) != null; h = p) {
+ h.next = h;
+ p.item = null;
+ }
+ head = last;
+ // assert head.item == null && head.next == null;
+ if (count.getAndSet(0) == capacity)
+ notFull.signal();
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * @throws UnsupportedOperationException {@inheritDoc}
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection super E> c) {
+ return drainTo(c, Integer.MAX_VALUE);
+ }
+
+ /**
+ * @throws UnsupportedOperationException {@inheritDoc}
+ * @throws ClassCastException {@inheritDoc}
+ * @throws NullPointerException {@inheritDoc}
+ * @throws IllegalArgumentException {@inheritDoc}
+ */
+ public int drainTo(Collection super E> c, int maxElements) {
+ if (c == null)
+ throw new NullPointerException();
+ if (c == this)
+ throw new IllegalArgumentException();
+ if (maxElements <= 0)
+ return 0;
+ boolean signalNotFull = false;
+ final ReentrantLock takeLock = this.takeLock;
+ takeLock.lock();
+ try {
+ int n = Math.min(maxElements, count.get());
+ // count.get provides visibility to first n Nodes
+ Node h = head;
+ int i = 0;
+ try {
+ while (i < n) {
+ Node p = h.next;
+ c.add(p.item);
+ p.item = null;
+ h.next = h;
+ h = p;
+ ++i;
+ }
+ return n;
+ } finally {
+ // Restore invariants even if c.add() threw
+ if (i > 0) {
+ // assert h.item == null;
+ head = h;
+ signalNotFull = (count.getAndAdd(-i) == capacity);
+ }
+ }
+ } finally {
+ takeLock.unlock();
+ if (signalNotFull)
+ signalNotFull();
+ }
+ }
+
+ /**
+ * Returns an iterator over the elements in this queue in proper sequence.
+ * The elements will be returned in order from first (head) to last (tail).
+ *
+ *
The returned iterator is
+ * weakly consistent.
+ *
+ * @return an iterator over the elements in this queue in proper sequence
+ */
+ public Iterator iterator() {
+ return new Itr();
+ }
+
+ private class Itr implements Iterator {
+ /*
+ * Basic weakly-consistent iterator. At all times hold the next
+ * item to hand out so that if hasNext() reports true, we will
+ * still have it to return even if lost race with a take etc.
+ */
+
+ private Node current;
+ private Node lastRet;
+ private E currentElement;
+
+ Itr() {
+ fullyLock();
+ try {
+ current = head.next;
+ if (current != null)
+ currentElement = current.item;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ public boolean hasNext() {
+ return current != null;
+ }
+
+ /**
+ * Returns the next live successor of p, or null if no such.
+ *
+ * Unlike other traversal methods, iterators need to handle both:
+ * - dequeued nodes (p.next == p)
+ * - (possibly multiple) interior removed nodes (p.item == null)
+ */
+ private Node nextNode(Node p) {
+ for (; ; ) {
+ Node s = p.next;
+ if (s == p)
+ return head.next;
+ if (s == null || s.item != null)
+ return s;
+ p = s;
+ }
+ }
+
+ public E next() {
+ fullyLock();
+ try {
+ if (current == null)
+ throw new NoSuchElementException();
+ E x = currentElement;
+ lastRet = current;
+ current = nextNode(current);
+ currentElement = (current == null) ? null : current.item;
+ return x;
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ public void remove() {
+ if (lastRet == null)
+ throw new IllegalStateException();
+ fullyLock();
+ try {
+ Node node = lastRet;
+ lastRet = null;
+ for (Node trail = head, p = trail.next;
+ p != null;
+ trail = p, p = p.next) {
+ if (p == node) {
+ unlink(p, trail);
+ break;
+ }
+ }
+ } finally {
+ fullyUnlock();
+ }
+ }
+ }
+
+ /**
+ * A customized variant of Spliterators.IteratorSpliterator
+ */
+ static final class LBQSpliterator implements Spliterator {
+ static final int MAX_BATCH = 1 << 25; // max batch array size;
+ final AlertBlockQueue queue;
+ Node current; // current node; null until initialized
+ int batch; // batch size for splits
+ boolean exhausted; // true when no more nodes
+ long est; // size estimate
+
+ LBQSpliterator(AlertBlockQueue queue) {
+ this.queue = queue;
+ this.est = queue.size();
+ }
+
+ public long estimateSize() {
+ return est;
+ }
+
+ public Spliterator trySplit() {
+ Node h;
+ final AlertBlockQueue q = this.queue;
+ int b = batch;
+ int n = (b <= 0) ? 1 : (b >= MAX_BATCH) ? MAX_BATCH : b + 1;
+ if (!exhausted &&
+ ((h = current) != null || (h = q.head.next) != null) &&
+ h.next != null) {
+ Object[] a = new Object[n];
+ int i = 0;
+ Node p = current;
+ q.fullyLock();
+ try {
+ if (p != null || (p = q.head.next) != null) {
+ do {
+ if ((a[i] = p.item) != null)
+ ++i;
+ } while ((p = p.next) != null && i < n);
+ }
+ } finally {
+ q.fullyUnlock();
+ }
+ if ((current = p) == null) {
+ est = 0L;
+ exhausted = true;
+ } else if ((est -= i) < 0L)
+ est = 0L;
+ if (i > 0) {
+ batch = i;
+ return Spliterators.spliterator
+ (a, 0, i, Spliterator.ORDERED | Spliterator.NONNULL |
+ Spliterator.CONCURRENT);
+ }
+ }
+ return null;
+ }
+
+ public void forEachRemaining(Consumer super E> action) {
+ if (action == null) throw new NullPointerException();
+ final AlertBlockQueue q = this.queue;
+ if (!exhausted) {
+ exhausted = true;
+ Node p = current;
+ do {
+ E e = null;
+ q.fullyLock();
+ try {
+ if (p == null)
+ p = q.head.next;
+ while (p != null) {
+ e = p.item;
+ p = p.next;
+ if (e != null)
+ break;
+ }
+ } finally {
+ q.fullyUnlock();
+ }
+ if (e != null)
+ action.accept(e);
+ } while (p != null);
+ }
+ }
+
+ public boolean tryAdvance(Consumer super E> action) {
+ if (action == null) throw new NullPointerException();
+ final AlertBlockQueue q = this.queue;
+ if (!exhausted) {
+ E e = null;
+ q.fullyLock();
+ try {
+ if (current == null)
+ current = q.head.next;
+ while (current != null) {
+ e = current.item;
+ current = current.next;
+ if (e != null)
+ break;
+ }
+ } finally {
+ q.fullyUnlock();
+ }
+ if (current == null)
+ exhausted = true;
+ if (e != null) {
+ action.accept(e);
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public int characteristics() {
+ return Spliterator.ORDERED | Spliterator.NONNULL |
+ Spliterator.CONCURRENT;
+ }
+ }
+
+ /**
+ * Returns a {@link Spliterator} over the elements in this queue.
+ *
+ *
The returned spliterator is
+ * weakly consistent.
+ *
+ *
The {@code Spliterator} reports {@link Spliterator#CONCURRENT},
+ * {@link Spliterator#ORDERED}, and {@link Spliterator#NONNULL}.
+ *
+ * @return a {@code Spliterator} over the elements in this queue
+ * @implNote The {@code Spliterator} implements {@code trySplit} to permit limited
+ * parallelism.
+ * @since 1.8
+ */
+ public Spliterator spliterator() {
+ return new LBQSpliterator(this);
+ }
+
+ /**
+ * Saves this queue to a stream (that is, serializes it).
+ *
+ * @param s the stream
+ * @throws java.io.IOException if an I/O error occurs
+ * @serialData The capacity is emitted (int), followed by all of
+ * its elements (each an {@code Object}) in the proper order,
+ * followed by a null
+ */
+ private void writeObject(java.io.ObjectOutputStream s)
+ throws java.io.IOException {
+
+ fullyLock();
+ try {
+ // Write out any hidden stuff, plus capacity
+ s.defaultWriteObject();
+
+ // Write out all elements in the proper order.
+ for (Node p = head.next; p != null; p = p.next)
+ s.writeObject(p.item);
+
+ // Use trailing null as sentinel
+ s.writeObject(null);
+ } finally {
+ fullyUnlock();
+ }
+ }
+
+ /**
+ * Reconstitutes this queue from a stream (that is, deserializes it).
+ *
+ * @param s the stream
+ * @throws ClassNotFoundException if the class of a serialized object
+ * could not be found
+ * @throws java.io.IOException if an I/O error occurs
+ */
+ private void readObject(java.io.ObjectInputStream s)
+ throws java.io.IOException, ClassNotFoundException {
+ // Read in capacity, and any hidden stuff
+ s.defaultReadObject();
+
+ count.set(0);
+ last = head = new Node(null);
+
+ // Read in all elements and place in queue
+ for (; ; ) {
+ @SuppressWarnings("unchecked")
+ E item = (E) s.readObject();
+ if (item == null)
+ break;
+ add(item);
+ }
+ }
+}
\ No newline at end of file
diff --git a/src/main/java/com/actiontech/dble/alarm/AlertSender.java b/src/main/java/com/actiontech/dble/alarm/AlertSender.java
new file mode 100644
index 000000000..a5eced110
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/alarm/AlertSender.java
@@ -0,0 +1,70 @@
+package com.actiontech.dble.alarm;
+
+import com.actiontech.dble.DbleServer;
+import com.actiontech.dble.cluster.ClusterController;
+import com.actiontech.dble.cluster.ClusterGeneralConfig;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Created by szf on 2019/3/22.
+ */
+public class AlertSender implements Runnable {
+ private static final Logger LOGGER = LoggerFactory.getLogger(AlertSender.class);
+
+ private final BlockingQueue alertQueue;
+
+ private static final Alert DEFAULT_ALERT = new NoAlert();
+ private static volatile Alert alert;
+
+ public AlertSender(BlockingQueue alertQueue) {
+ this.alertQueue = alertQueue;
+ initAlert();
+ }
+
+ @Override
+ public void run() {
+ AlertTask alertTask;
+ while (true) {
+ try {
+ alertTask = alertQueue.take();
+
+ switch (alertTask.getAlertType()) {
+ case ALERT:
+ alert.alert(alertTask.getAlertBean());
+ break;
+ case ALERT_SELF:
+ alert.alertSelf(alertTask.getAlertBean());
+ break;
+ case ALERT_RESOLVE:
+ if (alert.alertResolve(alertTask.getAlertBean())) {
+ alertTask.alertCallBack();
+ }
+ break;
+ case ALERT_SELF_RESOLVE:
+ if (alert.alertSelfResolve(alertTask.getAlertBean())) {
+ alertTask.alertCallBack();
+ }
+ break;
+ default:
+ break;
+ }
+ } catch (Throwable e) {
+ LOGGER.error("get error when send queue", e);
+ }
+ }
+ }
+
+
+ public void initAlert() {
+ if (DbleServer.getInstance().isUseGeneralCluster() &&
+ (ClusterController.CONFIG_MODE_UCORE.equals(ClusterGeneralConfig.getInstance().getClusterType()) ||
+ ClusterController.CONFIG_MODE_USHARD.equals(ClusterGeneralConfig.getInstance().getClusterType()))) {
+ alert = UcoreAlert.getInstance();
+ } else {
+ alert = DEFAULT_ALERT;
+ }
+ }
+}
diff --git a/src/main/java/com/actiontech/dble/alarm/AlertTask.java b/src/main/java/com/actiontech/dble/alarm/AlertTask.java
new file mode 100644
index 000000000..b13236a61
--- /dev/null
+++ b/src/main/java/com/actiontech/dble/alarm/AlertTask.java
@@ -0,0 +1,43 @@
+package com.actiontech.dble.alarm;
+
+import com.actiontech.dble.cluster.bean.ClusterAlertBean;
+
+import java.util.Set;
+
+/**
+ * Created by szf on 2019/3/29.
+ */
+public class AlertTask {
+
+ private ClusterAlertBean alertBean;
+ private Alert.AlertType alertType;
+ private Set callbackSet;
+ private String callbackKey;
+
+
+ public AlertTask(Alert.AlertType alertType, Set callbackSet, String callbackKey, ClusterAlertBean alertBean) {
+ this.alertBean = alertBean;
+ this.alertType = alertType;
+ this.callbackSet = callbackSet;
+ this.callbackKey = callbackKey;
+
+ }
+
+
+ public ClusterAlertBean getAlertBean() {
+ return alertBean;
+ }
+
+ public Alert.AlertType getAlertType() {
+ return alertType;
+ }
+
+
+ public void alertCallBack() {
+ if (callbackSet != null &&
+ callbackKey != null) {
+ callbackSet.remove(callbackKey);
+ }
+ }
+
+}
diff --git a/src/main/java/com/actiontech/dble/alarm/AlertUtil.java b/src/main/java/com/actiontech/dble/alarm/AlertUtil.java
index 4dfc0bda1..3833e05a4 100644
--- a/src/main/java/com/actiontech/dble/alarm/AlertUtil.java
+++ b/src/main/java/com/actiontech/dble/alarm/AlertUtil.java
@@ -5,62 +5,62 @@
package com.actiontech.dble.alarm;
-import com.actiontech.dble.DbleServer;
-import com.actiontech.dble.cluster.ClusterController;
-import com.actiontech.dble.cluster.ClusterGeneralConfig;
+import com.actiontech.dble.cluster.bean.ClusterAlertBean;
+import com.actiontech.dble.server.status.AlertManager;
import java.util.HashMap;
import java.util.Map;
+import java.util.Set;
public final class AlertUtil {
private AlertUtil() {
}
- private static volatile Alert alert;
- private static final Alert DEFAULT_ALERT = new NoAlert();
private static volatile boolean isEnable = false;
- static {
- alert = DEFAULT_ALERT;
- }
-
public static void switchAlert(boolean enableAlert) {
isEnable = enableAlert;
}
- public static void initAlert() {
- if (DbleServer.getInstance().isUseGeneralCluster() &&
- (ClusterController.CONFIG_MODE_UCORE.equals(ClusterGeneralConfig.getInstance().getClusterType()) ||
- ClusterController.CONFIG_MODE_USHARD.equals(ClusterGeneralConfig.getInstance().getClusterType()))) {
- alert = UcoreAlert.getInstance();
- } else {
- alert = DEFAULT_ALERT;
- }
- }
-
public static boolean isEnable() {
return isEnable;
}
public static void alertSelf(String code, Alert.AlertLevel level, String desc, Map labels) {
if (isEnable) {
- alert.alertSelf(code, level, desc, labels);
+ ClusterAlertBean bean = new ClusterAlertBean();
+ bean.setCode(code).setLevel(level.toString()).setDesc(desc).setLabels(labels);
+ AlertTask task = new AlertTask(Alert.AlertType.ALERT_SELF, null, null, bean);
+ AlertManager.getInstance().getAlertQueue().offer(task);
}
}
public static void alert(String code, Alert.AlertLevel level, String desc, String alertComponentType, String alertComponentId, Map labels) {
if (isEnable) {
- alert.alert(code, level, desc, alertComponentType, alertComponentId, labels);
+ ClusterAlertBean bean = new ClusterAlertBean();
+ bean.setCode(code).setLevel(level.toString()).setDesc(desc).setLabels(labels).setAlertComponentType(alertComponentType).setAlertComponentId(alertComponentId);
+ AlertTask task = new AlertTask(Alert.AlertType.ALERT, null, null, bean);
+ AlertManager.getInstance().getAlertQueue().offer(task);
}
}
- public static boolean alertResolve(String code, Alert.AlertLevel level, String alertComponentType, String alertComponentId, Map labels) {
- return isEnable ? alert.alertResolve(code, level, alertComponentType, alertComponentId, labels) : true;
+ public static void alertResolve(String code, Alert.AlertLevel level, String alertComponentType, String alertComponentId, Map labels, Set callbackSet, String callbackKey) {
+ if (isEnable) {
+ ClusterAlertBean bean = new ClusterAlertBean();
+ bean.setCode(code).setLevel(level.toString()).setAlertComponentId(alertComponentId).setAlertComponentType(alertComponentType).setLabels(labels);
+ AlertTask task = new AlertTask(Alert.AlertType.ALERT_RESOLVE, callbackSet, callbackKey, bean);
+ AlertManager.getInstance().getAlertQueue().offer(task);
+ }
}
- public static boolean alertSelfResolve(String code, Alert.AlertLevel level, Map labels) {
- return isEnable ? alert.alertSelfResolve(code, level, labels) : true;
+ public static void alertSelfResolve(String code, Alert.AlertLevel level, Map labels, Set callbackSet, String callbackKey) {
+ if (isEnable) {
+ ClusterAlertBean bean = new ClusterAlertBean();
+ bean.setCode(code).setLevel(level.toString()).setLabels(labels);
+ AlertTask task = new AlertTask(Alert.AlertType.ALERT_SELF_RESOLVE, callbackSet, callbackKey, bean);
+ AlertManager.getInstance().getAlertQueue().offer(task);
+ }
}
public static Map genSingleLabel(String key, String value) {
diff --git a/src/main/java/com/actiontech/dble/alarm/NoAlert.java b/src/main/java/com/actiontech/dble/alarm/NoAlert.java
index 3b012bb62..18031d002 100644
--- a/src/main/java/com/actiontech/dble/alarm/NoAlert.java
+++ b/src/main/java/com/actiontech/dble/alarm/NoAlert.java
@@ -5,28 +5,28 @@
package com.actiontech.dble.alarm;
-import java.util.Map;
+import com.actiontech.dble.cluster.bean.ClusterAlertBean;
public class NoAlert implements Alert {
+
@Override
- public void alertSelf(String code, AlertLevel level, String desc, Map labels) {
+ public void alertSelf(ClusterAlertBean bean) {
}
@Override
- public void alert(String code, AlertLevel level, String desc, String alertComponentType, String alertComponentId, Map labels) {
+ public void alert(ClusterAlertBean bean) {
}
@Override
- public boolean alertResolve(String code, AlertLevel level, String alertComponentType, String alertComponentId, Map labels) {
+ public boolean alertResolve(ClusterAlertBean bean) {
return true;
}
@Override
- public boolean alertSelfResolve(String code, AlertLevel level, Map labels) {
+ public boolean alertSelfResolve(ClusterAlertBean bean) {
return true;
}
-
}
diff --git a/src/main/java/com/actiontech/dble/alarm/ToResolveContainer.java b/src/main/java/com/actiontech/dble/alarm/ToResolveContainer.java
index 40d42c5b0..f3c9c7947 100644
--- a/src/main/java/com/actiontech/dble/alarm/ToResolveContainer.java
+++ b/src/main/java/com/actiontech/dble/alarm/ToResolveContainer.java
@@ -20,4 +20,5 @@ public final class ToResolveContainer {
public static final Set DATA_NODE_LACK = Collections.newSetFromMap(new ConcurrentHashMap());
public static final Set CREATE_CONN_FAIL = Collections.newSetFromMap(new ConcurrentHashMap());
public static final Set REACH_MAX_CON = Collections.newSetFromMap(new ConcurrentHashMap());
+ public static final Set XA_WRITE_CHECK_POINT_FAIL = Collections.newSetFromMap(new ConcurrentHashMap());
}
diff --git a/src/main/java/com/actiontech/dble/alarm/UcoreAlert.java b/src/main/java/com/actiontech/dble/alarm/UcoreAlert.java
index ccbc403c0..2a269d60b 100644
--- a/src/main/java/com/actiontech/dble/alarm/UcoreAlert.java
+++ b/src/main/java/com/actiontech/dble/alarm/UcoreAlert.java
@@ -10,7 +10,6 @@ import com.actiontech.dble.cluster.ClusterHelper;
import com.actiontech.dble.cluster.ClusterParamCfg;
import com.actiontech.dble.cluster.bean.ClusterAlertBean;
-import java.util.Map;
public final class UcoreAlert implements Alert {
private static final String SOURCE_COMPONENT_TYPE = "dble";
@@ -26,49 +25,32 @@ public final class UcoreAlert implements Alert {
}
@Override
- public void alertSelf(String code, AlertLevel level, String desc, Map labels) {
- alert(code, level, desc, SOURCE_COMPONENT_TYPE, ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), labels);
+ public void alertSelf(ClusterAlertBean alert) {
+ alert(alert.setAlertComponentType(SOURCE_COMPONENT_TYPE).setAlertComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID)));
}
@Override
- public void alert(String code, AlertLevel level, String desc, String alertComponentType, String alertComponentId, Map labels) {
- ClusterAlertBean alert = new ClusterAlertBean();
- alert.setCode(code);
- alert.setDesc(desc);
- alert.setLevel(level.toString());
- alert.setSourceComponentType(SOURCE_COMPONENT_TYPE);
- alert.setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID));
- alert.setAlertComponentId(alertComponentId);
- alert.setAlertComponentType(alertComponentType);
- alert.setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID));
- alert.setTimestampUnix(System.currentTimeMillis() * 1000000);
- if (labels != null) {
- alert.setLabels(labels);
- }
+ public void alert(ClusterAlertBean alert) {
+ alert.setSourceComponentType(SOURCE_COMPONENT_TYPE).
+ setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID)).
+ setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID)).
+ setTimestampUnix(System.currentTimeMillis() * 1000000);
ClusterHelper.alert(alert);
}
@Override
- public boolean alertResolve(String code, AlertLevel level, String alertComponentType, String alertComponentId, Map labels) {
- ClusterAlertBean alert = new ClusterAlertBean();
- alert.setCode(code);
- alert.setDesc("");
- alert.setLevel(level.toString());
- alert.setSourceComponentType(SOURCE_COMPONENT_TYPE);
- alert.setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID));
- alert.setAlertComponentId(alertComponentId);
- alert.setAlertComponentType(alertComponentType);
- alert.setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID));
- alert.setResolveTimestampUnix(System.currentTimeMillis() * 1000000);
- if (labels != null) {
- alert.setLabels(labels);
- }
+ public boolean alertResolve(ClusterAlertBean alert) {
+ alert.setDesc("").
+ setSourceComponentType(SOURCE_COMPONENT_TYPE).
+ setSourceComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID)).
+ setServerId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_SERVER_ID)).
+ setResolveTimestampUnix(System.currentTimeMillis() * 1000000);
return ClusterHelper.alertResolve(alert);
}
@Override
- public boolean alertSelfResolve(String code, AlertLevel level, Map labels) {
- return alertResolve(code, level, SOURCE_COMPONENT_TYPE, ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID), labels);
+ public boolean alertSelfResolve(ClusterAlertBean alert) {
+ return alertResolve(alert.setAlertComponentType(SOURCE_COMPONENT_TYPE).setAlertComponentId(ClusterGeneralConfig.getInstance().getValue(ClusterParamCfg.CLUSTER_CFG_MYID)));
}
}
diff --git a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDatasource.java b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDatasource.java
index a8bea28f8..300317615 100644
--- a/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDatasource.java
+++ b/src/main/java/com/actiontech/dble/backend/datasource/PhysicalDatasource.java
@@ -254,9 +254,8 @@ public abstract class PhysicalDatasource {
}
if (ToResolveContainer.CREATE_CONN_FAIL.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) {
Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- if (AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels)) {
- ToResolveContainer.CREATE_CONN_FAIL.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- }
+ AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(),
+ labels, ToResolveContainer.CREATE_CONN_FAIL, this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
}
} catch (IOException e) {
String errMsg = "create connection err ";
@@ -326,9 +325,8 @@ public abstract class PhysicalDatasource {
String schema) {
if (ToResolveContainer.CREATE_CONN_FAIL.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) {
Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- if (AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels)) {
- ToResolveContainer.CREATE_CONN_FAIL.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- }
+ AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels,
+ ToResolveContainer.CREATE_CONN_FAIL, this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
}
takeCon(conn, schema);
conn.setAttachment(attachment);
@@ -381,9 +379,9 @@ public abstract class PhysicalDatasource {
} else { // create connection
if (ToResolveContainer.REACH_MAX_CON.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) {
Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- if (AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", this.getConfig().getId(), labels)) {
- ToResolveContainer.REACH_MAX_CON.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- }
+ AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", this.getConfig().getId(), labels,
+ ToResolveContainer.REACH_MAX_CON, this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
+
}
LOGGER.info("no idle connection in pool,create new connection for " + this.name + " of schema " + schema);
createNewConnection(handler, attachment, schema);
@@ -404,9 +402,8 @@ public abstract class PhysicalDatasource {
} else { // create connection
if (ToResolveContainer.REACH_MAX_CON.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) {
Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- if (AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", this.getConfig().getId(), labels)) {
- ToResolveContainer.REACH_MAX_CON.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- }
+ AlertUtil.alertResolve(AlarmCode.REACH_MAX_CON, Alert.AlertLevel.WARN, "dble", this.getConfig().getId(), labels,
+ ToResolveContainer.REACH_MAX_CON, this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
}
LOGGER.info("no ilde connection in pool,create new connection for " + this.name + " of schema " + schema);
try {
@@ -415,9 +412,8 @@ public abstract class PhysicalDatasource {
con = simpleHandler.getBackConn();
if (ToResolveContainer.CREATE_CONN_FAIL.contains(this.getHostConfig().getName() + "-" + this.getConfig().getHostName())) {
Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- if (AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels)) {
- ToResolveContainer.CREATE_CONN_FAIL.remove(this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
- }
+ AlertUtil.alertResolve(AlarmCode.CREATE_CONN_FAIL, Alert.AlertLevel.WARN, "mysql", this.getConfig().getId(), labels,
+ ToResolveContainer.CREATE_CONN_FAIL, this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
}
} catch (IOException e) {
Map labels = AlertUtil.genSingleLabel("data_host", this.getHostConfig().getName() + "-" + this.getConfig().getHostName());
diff --git a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java
index c9ef9f2a1..277a93ef2 100644
--- a/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java
+++ b/src/main/java/com/actiontech/dble/backend/heartbeat/MySQLHeartbeat.java
@@ -150,7 +150,7 @@ public class MySQLHeartbeat extends DBHeartbeat {
private void setOk() {
if (this.status != OK_STATUS) {
Map labels = AlertUtil.genSingleLabel("data_host", this.source.getHostConfig().getName() + "-" + this.source.getConfig().getHostName());
- AlertUtil.alertResolve(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "mysql", this.source.getConfig().getId(), labels);
+ AlertUtil.alertResolve(AlarmCode.HEARTBEAT_FAIL, Alert.AlertLevel.WARN, "mysql", this.source.getConfig().getId(), labels, null, null);
}
switch (status) {
case TIMEOUT_STATUS:
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java
index 896f0e08c..130092c4d 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XACommitNodesHandler.java
@@ -40,6 +40,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
private Lock lockForErrorHandle = new ReentrantLock();
private Condition sendFinished = lockForErrorHandle.newCondition();
private volatile boolean sendFinishedFlag = false;
+
public XACommitNodesHandler(NonBlockingSession session) {
super(session);
}
@@ -82,6 +83,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
}
+
@Override
public void clearResources() {
tryCommitTimes = 0;
@@ -394,7 +396,7 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
session.cancelableStatusSet(NonBlockingSession.CANCEL_STATUS_INIT);
byte[] toSend = sendData;
session.clearResources(false);
- AlertUtil.alertSelfResolve(AlarmCode.XA_BACKGROUND_RETRY_FAIL, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("XA_ID", session.getSessionXaID()));
+ AlertUtil.alertSelfResolve(AlarmCode.XA_BACKGROUND_RETRY_FAIL, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("XA_ID", session.getSessionXaID()), null, null);
if (!session.closed()) {
setResponseTime(isSuccess);
session.getSource().write(toSend);
@@ -441,7 +443,6 @@ public class XACommitNodesHandler extends AbstractCommitNodesHandler {
}
-
private void waitUntilSendFinish() {
this.lockForErrorHandle.lock();
try {
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java
index d2140f56b..2bdbd1aa6 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/nio/handler/transaction/xa/XARollbackNodesHandler.java
@@ -408,7 +408,7 @@ public class XARollbackNodesHandler extends AbstractRollbackNodesHandler {
session.setXaState(TxState.TX_INITIALIZE_STATE);
byte[] toSend = sendData;
session.clearResources(false);
- AlertUtil.alertSelfResolve(AlarmCode.XA_BACKGROUND_RETRY_FAIL, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("XA_ID", session.getSessionXaID()));
+ AlertUtil.alertSelfResolve(AlarmCode.XA_BACKGROUND_RETRY_FAIL, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("XA_ID", session.getSessionXaID()), null, null);
if (!session.closed()) {
setResponseTime(false);
session.getSource().write(toSend);
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/xa/XAStateLog.java b/src/main/java/com/actiontech/dble/backend/mysql/xa/XAStateLog.java
index ac1b1e972..22e5a97fc 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/xa/XAStateLog.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/xa/XAStateLog.java
@@ -41,6 +41,7 @@ public final class XAStateLog {
}
}
+ public static final String XA_ALERT_FLAG = "XA_ALERT_FLAG";
private static final Repository IN_MEMORY_REPOSITORY = new InMemoryRepository();
private static ReentrantLock lock = new ReentrantLock();
private static AtomicBoolean hasLeader = new AtomicBoolean(false);
diff --git a/src/main/java/com/actiontech/dble/backend/mysql/xa/recovery/impl/FileSystemRepository.java b/src/main/java/com/actiontech/dble/backend/mysql/xa/recovery/impl/FileSystemRepository.java
index 608d733ca..58e4e410f 100644
--- a/src/main/java/com/actiontech/dble/backend/mysql/xa/recovery/impl/FileSystemRepository.java
+++ b/src/main/java/com/actiontech/dble/backend/mysql/xa/recovery/impl/FileSystemRepository.java
@@ -9,6 +9,7 @@ import com.actiontech.dble.DbleServer;
import com.actiontech.dble.alarm.AlarmCode;
import com.actiontech.dble.alarm.Alert;
import com.actiontech.dble.alarm.AlertUtil;
+import com.actiontech.dble.alarm.ToResolveContainer;
import com.actiontech.dble.backend.mysql.xa.*;
import com.actiontech.dble.backend.mysql.xa.recovery.DeserializationException;
import com.actiontech.dble.backend.mysql.xa.recovery.Repository;
@@ -24,6 +25,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
+import static com.actiontech.dble.backend.mysql.xa.XAStateLog.XA_ALERT_FLAG;
+
/**
* Created by zhangchao on 2016/10/13.
*/
@@ -202,15 +205,15 @@ public class FileSystemRepository implements Repository {
}
rwChannel.force(false);
file.discardBackupVersion();
- if (XAStateLog.isWriteAlert()) {
- boolean resolved = AlertUtil.alertSelfResolve(AlarmCode.XA_WRITE_CHECK_POINT_FAIL, Alert.AlertLevel.WARN, null);
- XAStateLog.setWriteAlert(resolved);
+ if (ToResolveContainer.XA_WRITE_CHECK_POINT_FAIL.size() > 0) {
+ AlertUtil.alertSelfResolve(AlarmCode.XA_WRITE_CHECK_POINT_FAIL, Alert.AlertLevel.WARN, null, null, null);
+ ToResolveContainer.XA_WRITE_CHECK_POINT_FAIL.remove(XA_ALERT_FLAG);
}
return true;
} catch (Exception e) {
LOGGER.warn("Failed to write checkpoint", e);
AlertUtil.alertSelf(AlarmCode.XA_WRITE_CHECK_POINT_FAIL, Alert.AlertLevel.WARN, "Failed to write checkpoint" + e.getMessage(), null);
- XAStateLog.setWriteAlert(true);
+ ToResolveContainer.XA_WRITE_CHECK_POINT_FAIL.add(XA_ALERT_FLAG);
return false;
}
}
diff --git a/src/main/java/com/actiontech/dble/cluster/bean/ClusterAlertBean.java b/src/main/java/com/actiontech/dble/cluster/bean/ClusterAlertBean.java
index ab9621648..e94fe75bf 100644
--- a/src/main/java/com/actiontech/dble/cluster/bean/ClusterAlertBean.java
+++ b/src/main/java/com/actiontech/dble/cluster/bean/ClusterAlertBean.java
@@ -21,8 +21,9 @@ public class ClusterAlertBean {
return labels;
}
- public void setLabels(Map labels) {
- this.labels = labels;
+ public ClusterAlertBean setLabels(Map xlabels) {
+ this.labels = xlabels;
+ return this;
}
Map labels;
@@ -31,79 +32,89 @@ public class ClusterAlertBean {
return code;
}
- public void setCode(String code) {
- this.code = code;
+ public ClusterAlertBean setCode(String xcode) {
+ this.code = xcode;
+ return this;
}
public String getLevel() {
return level;
}
- public void setLevel(String level) {
- this.level = level;
+ public ClusterAlertBean setLevel(String xlevel) {
+ this.level = xlevel;
+ return this;
}
public String getDesc() {
return desc;
}
- public void setDesc(String desc) {
- this.desc = desc;
+ public ClusterAlertBean setDesc(String xdesc) {
+ this.desc = xdesc;
+ return this;
}
public String getSourceComponentType() {
return sourceComponentType;
}
- public void setSourceComponentType(String sourceComponentType) {
- this.sourceComponentType = sourceComponentType;
+ public ClusterAlertBean setSourceComponentType(String xsourceComponentType) {
+ this.sourceComponentType = xsourceComponentType;
+ return this;
}
public String getSourceComponentId() {
return sourceComponentId;
}
- public void setSourceComponentId(String sourceComponentId) {
- this.sourceComponentId = sourceComponentId;
+ public ClusterAlertBean setSourceComponentId(String xsourceComponentId) {
+ this.sourceComponentId = xsourceComponentId;
+ return this;
}
public String getAlertComponentType() {
return alertComponentType;
}
- public void setAlertComponentType(String alertComponentType) {
- this.alertComponentType = alertComponentType;
+ public ClusterAlertBean setAlertComponentType(String xalertComponentType) {
+ this.alertComponentType = xalertComponentType;
+ return this;
}
public String getAlertComponentId() {
return alertComponentId;
}
- public void setAlertComponentId(String alertComponentId) {
- this.alertComponentId = alertComponentId;
+ public ClusterAlertBean setAlertComponentId(String xalertComponentId) {
+ this.alertComponentId = xalertComponentId;
+ return this;
}
public String getServerId() {
return serverId;
}
- public void setServerId(String serverId) {
- this.serverId = serverId;
+ public ClusterAlertBean setServerId(String xserverId) {
+ this.serverId = xserverId;
+ return this;
}
public long getTimestampUnix() {
return timestampUnix;
}
- public void setTimestampUnix(long timestampUnix) {
- this.timestampUnix = timestampUnix;
+ public ClusterAlertBean setTimestampUnix(long xtimestampUnix) {
+ this.timestampUnix = xtimestampUnix;
+ return this;
}
public long getResolveTimestampUnix() {
return resolveTimestampUnix;
}
- public void setResolveTimestampUnix(long resolveTimestampUnix) {
- this.resolveTimestampUnix = resolveTimestampUnix;
+ public ClusterAlertBean setResolveTimestampUnix(long xresolveTimestampUnix) {
+ this.resolveTimestampUnix = xresolveTimestampUnix;
+ return this;
}
}
diff --git a/src/main/java/com/actiontech/dble/manager/handler/CreateDatabaseHandler.java b/src/main/java/com/actiontech/dble/manager/handler/CreateDatabaseHandler.java
index 6fe8e8eff..9e3866ce9 100644
--- a/src/main/java/com/actiontech/dble/manager/handler/CreateDatabaseHandler.java
+++ b/src/main/java/com/actiontech/dble/manager/handler/CreateDatabaseHandler.java
@@ -79,9 +79,8 @@ public final class CreateDatabaseHandler {
if (ToResolveContainer.DATA_NODE_LACK.contains(key)) {
Map labels = AlertUtil.genSingleLabel("data_host", ds.getHostConfig().getName() + "-" + ds.getConfig().getHostName());
labels.put("data_node", dataNode);
- if (AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels)) {
- ToResolveContainer.DATA_NODE_LACK.remove(key);
- }
+ AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels,
+ ToResolveContainer.DATA_NODE_LACK, key);
}
}
numberCount.decrementAndGet();
diff --git a/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java b/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java
index 62a2cd34d..5e8236158 100644
--- a/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java
+++ b/src/main/java/com/actiontech/dble/meta/ProxyMetaManager.java
@@ -602,8 +602,8 @@ public class ProxyMetaManager {
for (String dataNode : tbConfig.getDataNodes()) {
showDataNode = dataNode;
String tableId = "DataNode[" + dataNode + "]:Table[" + tableName + "]";
- if (ToResolveContainer.TABLE_LACK.contains(tableId) && AlertUtil.alertSelfResolve(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId))) {
- ToResolveContainer.TABLE_LACK.remove(tableId);
+ if (ToResolveContainer.TABLE_LACK.contains(tableId)) {
+ AlertUtil.alertSelfResolve(AlarmCode.TABLE_LACK, Alert.AlertLevel.WARN, AlertUtil.genSingleLabel("TABLE", tableId), ToResolveContainer.TABLE_LACK, tableId);
}
}
}
diff --git a/src/main/java/com/actiontech/dble/meta/table/AbstractTablesMetaHandler.java b/src/main/java/com/actiontech/dble/meta/table/AbstractTablesMetaHandler.java
index 4eb3902cb..6c259cecc 100644
--- a/src/main/java/com/actiontech/dble/meta/table/AbstractTablesMetaHandler.java
+++ b/src/main/java/com/actiontech/dble/meta/table/AbstractTablesMetaHandler.java
@@ -123,9 +123,8 @@ public abstract class AbstractTablesMetaHandler {
if (ds != null && ToResolveContainer.DATA_NODE_LACK.contains(key)) {
Map labels = AlertUtil.genSingleLabel("data_host", ds.getHostConfig().getName() + "-" + ds.getConfig().getHostName());
labels.put("data_node", dataNode);
- if (AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels)) {
- ToResolveContainer.DATA_NODE_LACK.remove(key);
- }
+ AlertUtil.alertResolve(AlarmCode.DATA_NODE_LACK, Alert.AlertLevel.WARN, "mysql", ds.getConfig().getId(), labels,
+ ToResolveContainer.DATA_NODE_LACK, key);
}
List