/*
 * Copyright 2009-2010 WorldWide Conferencing, LLC
 *
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *
 *     http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */

package net.liftweb {
package actor {

import common._

trait ILAExecute {
  def execute(f: () => Unit): Unit
  def shutdown(): Unit
}

object LAScheduler {
  @volatile
  var onSameThread = false

  /**
   * Set this variable to the number of threads to allocate in the thread pool
   */
  @volatile var threadPoolSize = 16 // issue 194

  @volatile
  var createExecutor: () => ILAExecute = () => {
    new ILAExecute {
      import _root_.java.util.concurrent.{Executors, Executor}

      private val es = Executors.newFixedThreadPool(threadPoolSize)

      def execute(f: () => Unit): Unit =
      es.execute(new Runnable{def run() {f()}})

      def shutdown(): Unit = {
        es.shutdown()
      }
    }
  }

  @volatile
  var exec: ILAExecute = _

  def execute(f: () => Unit) {
    synchronized {
      if (exec eq null) {
        exec = createExecutor()
      }
      exec.execute(f)
    }
  }

  def shutdown() {
    synchronized {
      if (exec ne null) {
        exec.shutdown()
      }
      
      exec = null
    }
  }
}

trait SpecializedLiftActor[T] extends SimpleActor[T]  {
  @volatile private[this] var processing = false
  private[this] val baseMailbox: MailboxItem = new SpecialMailbox
  @volatile private[this] var msgList: List[T] = Nil
  @volatile private[this] var priorityMsgList: List[T] = Nil
  @volatile private[this] var startCnt = 0

  private class MailboxItem(val item: T) {
    var next: MailboxItem = _
    var prev: MailboxItem = _

    def find(f: MailboxItem => Boolean): Box[MailboxItem] =
    if (f(this)) Full(this) else next.find(f)

    def remove() {
      val newPrev = prev
      prev.next = next
      next.prev = prev
    }

    def insertAfter(newItem: MailboxItem): MailboxItem = {
      next.prev = newItem
      newItem.prev = this
      newItem.next = this.next
      next = newItem
      newItem
    }

    def insertBefore(newItem: MailboxItem): MailboxItem = {
      prev.next = newItem
      newItem.prev = this.prev
      newItem.next = this
      prev = newItem
      newItem
    }
  }

  private class SpecialMailbox extends MailboxItem(null.asInstanceOf[T]) {
    override def find(f: MailboxItem => Boolean): Box[MailboxItem] = Empty
    next = this
    prev = this
  }

  def !(msg: T): Unit = {
    val toDo: () => Unit = baseMailbox.synchronized {
      msgList ::= msg
      if (!processing) {
        if (LAScheduler.onSameThread) {
          processing = true
          () => processMailbox(true)
        } else {
          if (startCnt == 0) {
            startCnt += 1
            () => LAScheduler.execute(() => processMailbox(false))
          } else
          () => {}
        }
      }
      else () => {}
    }
    toDo()
  }

  /**
   * This method inserts the message at the head of the mailbox
   * It's protected because this functionality may or may not want
   * to be exposed'
   */
  protected def insertMsgAtHeadOfQueue_!(msg: T): Unit = {
     val toDo: () => Unit = baseMailbox.synchronized {
      this.priorityMsgList ::= msg
      if (!processing) {
        if (LAScheduler.onSameThread) {
          processing = true
          () => processMailbox(true)
        } else {
          if (startCnt == 0) {
            startCnt += 1
            () => LAScheduler.execute(() => processMailbox(false))
          } else
          () => {}
        }
      }
      else () => {}
    }
    toDo()
  }

  private def processMailbox(ignoreProcessing: Boolean) {
    around {
      proc2(ignoreProcessing)
    }
  }

/**
 * A list of LoanWrappers that will be executed around the evaluation of mailboxes
 */
protected def aroundLoans: List[CommonLoanWrapper] = Nil

/**
 * You can wrap calls around the evaluation of the mailbox.  This allows you to set up
 * the environment
 */
protected def around[R](f: => R): R = aroundLoans match {
  case Nil => f
  case xs => CommonLoanWrapper(xs)(f)
}

  private def proc2(ignoreProcessing: Boolean) {
    var clearProcessing = true
    baseMailbox.synchronized {
      if (!ignoreProcessing && processing) return
      processing = true
      if (startCnt > 0) startCnt = 0
    }

    val eh = exceptionHandler

    def putListIntoMB(): Unit = {
      if (!priorityMsgList.isEmpty) {
      priorityMsgList.foldRight(baseMailbox)((msg, mb) => mb.insertAfter(new MailboxItem(msg)))
      priorityMsgList = Nil
      }

      if (!msgList.isEmpty) {
      msgList.foldLeft(baseMailbox)((mb, msg) => mb.insertBefore(new MailboxItem(msg)))
      msgList = Nil
      }
    }

    try {
      while (true) {
        baseMailbox.synchronized {
          putListIntoMB()
        }

            var keepOnDoingHighPriory = true

            while (keepOnDoingHighPriory) {
              val hiPriPfBox = highPriorityReceive
              if (hiPriPfBox.isDefined) {
                val hiPriPf = hiPriPfBox.open_!
                baseMailbox.next.find(mb => testTranslate(hiPriPf.isDefinedAt)(mb.item)) match {
                  case Full(mb) =>
                    mb.remove()
                    try {
                      execTranslate(hiPriPf)(mb.item)
                    } catch {
                      case e: Exception => if (eh.isDefinedAt(e)) eh(e)
                    }
                  case _ =>
                    baseMailbox.synchronized {
                      if (msgList.isEmpty) {
                        keepOnDoingHighPriory = false
                      }
                      else {
                        putListIntoMB()
                      }
                    }
                } }
              else {keepOnDoingHighPriory = false}
            }

            val pf = messageHandler

        baseMailbox.next.find(mb => testTranslate(pf.isDefinedAt)(mb.item)) match {
          case Full(mb) =>
            mb.remove()
            try {
              execTranslate(pf)(mb.item)
            } catch {
              case e: Exception => if (eh.isDefinedAt(e)) eh(e)
            }
          case _ =>
            baseMailbox.synchronized {
              if (msgList.isEmpty) {
                processing = false
                clearProcessing = false
                return
              }
              else {
                putListIntoMB()
              }
            }
        }
      }
    } catch {
      case e =>
        if (eh.isDefinedAt(e)) eh(e)
        throw e
    } finally {
      if (clearProcessing) {
        baseMailbox.synchronized {
          processing = false
        }
      }
    }
  }

  protected def testTranslate(f: T => Boolean)(v: T): Boolean = f(v)

  protected def execTranslate(f: T => Unit)(v: T): Unit = f(v)

  protected def messageHandler: PartialFunction[T, Unit]

  protected def highPriorityReceive: Box[PartialFunction[T, Unit]] = Empty

  protected def exceptionHandler: PartialFunction[Throwable, Unit] = {
    case e => ActorLogger.error("Actor threw an exception", e)
  }
}

object ActorLogger extends Logger {
}

private final case class MsgWithResp(msg: Any, future: LAFuture[Any])

trait LiftActor extends SpecializedLiftActor[Any]
with GenericActor[Any]
with ForwardableActor[Any, Any] {
  @volatile
  private[this] var responseFuture: LAFuture[Any] = null



  protected final def forwardMessageTo(msg: Any, forwardTo: TypedActor[Any, Any]) {
    if (null ne responseFuture) {
      forwardTo match {
        case la: LiftActor => la ! MsgWithResp(msg, responseFuture)
        case other =>
          reply(other !? msg)
      }
    } else forwardTo ! msg
  }

  def !<(msg: Any): LAFuture[Any] = {
    val future = new LAFuture[Any]
    this ! MsgWithResp(msg, future)
    future
  }

  def !?(msg: Any): Any = {
    val future = new LAFuture[Any]
    this ! MsgWithResp(msg, future)
    future.get
  }

  /**
   * Compatible with Scala Actors' !? method
   */
  def !?(timeout: Long, message: Any): Box[Any] =
    this !! (message, timeout)


  def !!(msg: Any, timeout: Long): Box[Any] = {
    val future = new LAFuture[Any]
    this ! MsgWithResp(msg, future)
    future.get(timeout)
  }

  def !!(msg: Any): Box[Any] = {
    val future = new LAFuture[Any]
    this ! MsgWithResp(msg, future)
    Full(future.get)
  }

  override protected def testTranslate(f: Any => Boolean)(v: Any) = v match {
    case MsgWithResp(msg, _) => f(msg)
    case v => f(v)
  }

  override protected def execTranslate(f: Any => Unit)(v: Any) = v match {
    case MsgWithResp(msg, future) =>
      responseFuture = future
      try {
        f(msg)
      } finally {
        responseFuture = null
      }
    case v => f(v)
  }


  protected def reply(v: Any) {
    if (null ne responseFuture) {
      responseFuture.satisfy(v)
    }
  }
}

}
}