Skip to content

Commit

Permalink
Merge branch '0.6.x' as v0.6.1 release
Browse files Browse the repository at this point in the history
  • Loading branch information
dwhjames committed Aug 13, 2014
2 parents 21cbd37 + ec1abae commit 8501b1c
Show file tree
Hide file tree
Showing 4 changed files with 89 additions and 68 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ organization in ThisBuild := "com.pellucid"

licenses in ThisBuild += ("Apache-2.0", url("http://www.apache.org/licenses/LICENSE-2.0"))

version in ThisBuild := "0.6.0"
version in ThisBuild := "0.6.1"

scalaVersion in ThisBuild := "2.11.1"

Expand Down
7 changes: 1 addition & 6 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
@@ -1,7 +1,2 @@

resolvers += Resolver.url(
"bintray-sbt-plugin-releases",
url("http://dl.bintray.com/content/sbt/sbt-plugin-releases"))(
Resolver.ivyStylePatterns)

addSbtPlugin("me.lessis" % "bintray-sbt" % "0.1.1")
addSbtPlugin("me.lessis" % "bintray-sbt" % "0.1.2")
59 changes: 1 addition & 58 deletions scratch/src/main/scala/Scratch.scala
Original file line number Diff line number Diff line change
Expand Up @@ -796,76 +796,19 @@ object ScratchS3 {

// Thread.sleep(100)

upload.addProgressListener(new ProgressListener {
override def progressChanged(progressEvent: ProgressEvent): Unit = {
progressEvent.getEventType match {
case ProgressEventType.CLIENT_REQUEST_FAILED_EVENT =>
logger.info("CLIENT_REQUEST_FAILED_EVENT")
case ProgressEventType.CLIENT_REQUEST_RETRY_EVENT =>
logger.info("CLIENT_REQUEST_RETRY_EVENT")
case ProgressEventType.CLIENT_REQUEST_STARTED_EVENT =>
logger.info("CLIENT_REQUEST_STARTED_EVENT")
case ProgressEventType.CLIENT_REQUEST_SUCCESS_EVENT =>
logger.info("CLIENT_REQUEST_SUCCESS_EVENT")
case ProgressEventType.HTTP_REQUEST_COMPLETED_EVENT =>
logger.info("HTTP_REQUEST_COMPLETED_EVENT")
case ProgressEventType.HTTP_REQUEST_CONTENT_RESET_EVENT =>
logger.info("HTTP_REQUEST_CONTENT_RESET_EVENT")
case ProgressEventType.HTTP_REQUEST_STARTED_EVENT =>
logger.info("HTTP_REQUEST_STARTED_EVENT")
case ProgressEventType.HTTP_RESPONSE_COMPLETED_EVENT =>
logger.info("HTTP_RESPONSE_COMPLETED_EVENT")
case ProgressEventType.HTTP_RESPONSE_CONTENT_RESET_EVENT =>
logger.info("HTTP_RESPONSE_CONTENT_RESET_EVENT")
case ProgressEventType.HTTP_RESPONSE_STARTED_EVENT =>
logger.info("HTTP_RESPONSE_STARTED_EVENT")
case ProgressEventType.REQUEST_BYTE_TRANSFER_EVENT =>
logger.info("REQUEST_BYTE_TRANSFER_EVENT")
case ProgressEventType.REQUEST_CONTENT_LENGTH_EVENT =>
logger.info("REQUEST_CONTENT_LENGTH_EVENT")
case ProgressEventType.RESPONSE_BYTE_DISCARD_EVENT =>
logger.info("RESPONSE_BYTE_DISCARD_EVENT")
case ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT =>
logger.info("RESPONSE_BYTE_TRANSFER_EVENT")
case ProgressEventType.RESPONSE_CONTENT_LENGTH_EVENT =>
logger.info("RESPONSE_CONTENT_LENGTH_EVENT")
case ProgressEventType.TRANSFER_CANCELED_EVENT =>
logger.info("TRANSFER_CANCELED_EVENT")
case ProgressEventType.TRANSFER_COMPLETED_EVENT =>
logger.info("TRANSFER_COMPLETED_EVENT")
case ProgressEventType.TRANSFER_FAILED_EVENT =>
logger.info("TRANSFER_FAILED_EVENT")
case ProgressEventType.TRANSFER_PART_COMPLETED_EVENT =>
logger.info("TRANSFER_PART_COMPLETED_EVENT")
case ProgressEventType.TRANSFER_PART_FAILED_EVENT =>
logger.info("TRANSFER_PART_FAILED_EVENT")
case ProgressEventType.TRANSFER_PART_STARTED_EVENT =>
logger.info("TRANSFER_PART_STARTED_EVENT")
case ProgressEventType.TRANSFER_PREPARING_EVENT =>
logger.info("TRANSFER_PREPARING_EVENT")
case ProgressEventType.TRANSFER_STARTED_EVENT =>
logger.info("TRANSFER_STARTED_EVENT")
case _ =>
logger.warn("unrecognized event code")
}
}
})


try {
Await.result(
FutureTransfer.listenFor(upload),
10.seconds
)
// logger.error("aws exception", upload.waitForException())
// logger.info(upload.waitForUploadResult().toString)
logger.info(upload.waitForUploadResult().toString)
} catch {
case ex: RuntimeException =>
logger.error(ex.getMessage)
}
logger.info(s"upload is done: ${upload.isDone()}")

Thread.sleep(5000)

transferManager.shutdownNow()
client.shutdown()
Expand Down
89 changes: 86 additions & 3 deletions src/main/scala/s3/s3.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ import com.amazonaws.services.s3._
import com.amazonaws.services.s3.model._
import com.amazonaws.services.s3.transfer.Transfer

import org.slf4j.{Logger, LoggerFactory}


private[s3] class S3ThreadFactory extends ThreadFactory {
private val count = new AtomicLong(0L)
Expand Down Expand Up @@ -456,6 +458,8 @@ class AmazonS3ScalaClient(
*/
object FutureTransfer {

private val logger: Logger = LoggerFactory.getLogger("com.pellucid.wrap.s3.FutureTransfer")

/**
* Attach a listener to an S3 Transfer and return it as a Future.
*
Expand All @@ -469,6 +473,9 @@ object FutureTransfer {
*
* In essence, this helper just gives back the transfer when it is done.
*
* The detailed progress of the transfer is logged at debug level to the
* `com.pellucid.wrap.s3.FutureTransfer` logger.
*
* @tparam T
* a subtype of Transfer.
* @param transfer
Expand All @@ -478,6 +485,65 @@ object FutureTransfer {
* @see [[http://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/event/ProgressListener.html ProgressListener]]
*/
def listenFor[T <: Transfer](transfer: T): Future[transfer.type] = {
val transferDescription = transfer.getDescription
def debugLog(eventType: String): Unit = {
logger.debug(s"$eventType : $transferDescription")
}
def logProgressEvent(progressEvent: ProgressEvent): Unit = {
if (logger.isDebugEnabled) {
progressEvent.getEventType match {
case ProgressEventType.CLIENT_REQUEST_FAILED_EVENT =>
debugLog("CLIENT_REQUEST_FAILED_EVENT")
case ProgressEventType.CLIENT_REQUEST_RETRY_EVENT =>
debugLog("CLIENT_REQUEST_RETRY_EVENT")
case ProgressEventType.CLIENT_REQUEST_STARTED_EVENT =>
debugLog("CLIENT_REQUEST_STARTED_EVENT")
case ProgressEventType.CLIENT_REQUEST_SUCCESS_EVENT =>
debugLog("CLIENT_REQUEST_SUCCESS_EVENT")
case ProgressEventType.HTTP_REQUEST_COMPLETED_EVENT =>
debugLog("HTTP_REQUEST_COMPLETED_EVENT")
case ProgressEventType.HTTP_REQUEST_CONTENT_RESET_EVENT =>
debugLog("HTTP_REQUEST_CONTENT_RESET_EVENT")
case ProgressEventType.HTTP_REQUEST_STARTED_EVENT =>
debugLog("HTTP_REQUEST_STARTED_EVENT")
case ProgressEventType.HTTP_RESPONSE_COMPLETED_EVENT =>
debugLog("HTTP_RESPONSE_COMPLETED_EVENT")
case ProgressEventType.HTTP_RESPONSE_CONTENT_RESET_EVENT =>
debugLog("HTTP_RESPONSE_CONTENT_RESET_EVENT")
case ProgressEventType.HTTP_RESPONSE_STARTED_EVENT =>
debugLog("HTTP_RESPONSE_STARTED_EVENT")
case ProgressEventType.REQUEST_BYTE_TRANSFER_EVENT =>
debugLog("REQUEST_BYTE_TRANSFER_EVENT")
case ProgressEventType.REQUEST_CONTENT_LENGTH_EVENT =>
debugLog("REQUEST_CONTENT_LENGTH_EVENT")
case ProgressEventType.RESPONSE_BYTE_DISCARD_EVENT =>
debugLog("RESPONSE_BYTE_DISCARD_EVENT")
case ProgressEventType.RESPONSE_BYTE_TRANSFER_EVENT =>
debugLog("RESPONSE_BYTE_TRANSFER_EVENT")
case ProgressEventType.RESPONSE_CONTENT_LENGTH_EVENT =>
debugLog("RESPONSE_CONTENT_LENGTH_EVENT")
case ProgressEventType.TRANSFER_CANCELED_EVENT =>
debugLog("TRANSFER_CANCELED_EVENT")
case ProgressEventType.TRANSFER_COMPLETED_EVENT =>
debugLog("TRANSFER_COMPLETED_EVENT")
case ProgressEventType.TRANSFER_FAILED_EVENT =>
debugLog("TRANSFER_FAILED_EVENT")
case ProgressEventType.TRANSFER_PART_COMPLETED_EVENT =>
debugLog("TRANSFER_PART_COMPLETED_EVENT")
case ProgressEventType.TRANSFER_PART_FAILED_EVENT =>
debugLog("TRANSFER_PART_FAILED_EVENT")
case ProgressEventType.TRANSFER_PART_STARTED_EVENT =>
debugLog("TRANSFER_PART_STARTED_EVENT")
case ProgressEventType.TRANSFER_PREPARING_EVENT =>
debugLog("TRANSFER_PREPARING_EVENT")
case ProgressEventType.TRANSFER_STARTED_EVENT =>
debugLog("TRANSFER_STARTED_EVENT")
case _ =>
logger.warn(s"unrecognized progress event type for transfer $transferDescription")
}
}
}

val p = Promise[transfer.type]

/* Attach a progress listener to the transfer.
Expand All @@ -493,20 +559,37 @@ object FutureTransfer {
* the potential to induce deadlock.
*/
override def progressChanged(progressEvent: ProgressEvent): Unit = {
logProgressEvent(progressEvent)

val code = progressEvent.getEventType()
if (code == ProgressEventType.TRANSFER_CANCELED_EVENT ||
code == ProgressEventType.TRANSFER_COMPLETED_EVENT ||
code == ProgressEventType.TRANSFER_FAILED_EVENT) {
p trySuccess transfer
val success = p trySuccess transfer
if (logger.isDebugEnabled) {
if (success) {
logger.debug(s"promise successfully completed from progress listener for $transferDescription")
} else {
logger.debug(s"promise was found to be already completed from progress listener for $transferDescription")
}
}
}
}
})

/* In case the progress listener never fires due to the
* transfer already being done, poll the transfer once.
*/
if (transfer.isDone)
p trySuccess transfer
if (transfer.isDone) {
val success = p trySuccess transfer
if (logger.isDebugEnabled) {
if (success) {
logger.debug(s"promise successfully completed outside of progress listener for $transferDescription")
} else {
logger.debug(s"promise was found to be already completed outside of progress listener for $transferDescription")
}
}
}

p.future
}
Expand Down

0 comments on commit 8501b1c

Please sign in to comment.