Skip to content

Commit

Permalink
Merge pull request #9 from xinix00/master
Browse files Browse the repository at this point in the history
Improved implementation for RavenDB
  • Loading branch information
AustinWinstanley authored Sep 2, 2016
2 parents bb57d04 + e5e8c76 commit 343603f
Show file tree
Hide file tree
Showing 38 changed files with 1,543 additions and 1,538 deletions.
37 changes: 37 additions & 0 deletions src/Hangfire.Raven/Assemblies.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Reflection;
using System.Text;
using System.Threading.Tasks;

namespace Hangfire.Raven
{
/// <summary>
/// Helper class for getting assemblies
/// </summary>
public static class Assemblies
{
/// <summary>
/// Get all loaded assemblies
/// </summary>
/// <returns></returns>
public static IEnumerable<Assembly> Get()
{
return AppDomain.CurrentDomain.GetAssemblies().Where(a => a != null);
}

/// <summary>
/// Get class by name within an assembly
/// </summary>
/// <param name="assembly">Assembly</param>
/// <param name="type">Name of type</param>
/// <returns></returns>
public static Type GetClass(Assembly assembly, string type)
{
return (from t in assembly.GetTypes()
where t.FullName == type //&& !t.IsAbstract && t.IsClass
select t).FirstOrDefault();
}
}
}
234 changes: 80 additions & 154 deletions src/Hangfire.Raven/DistributedLocks/RavenDistributedLock.cs
Original file line number Diff line number Diff line change
@@ -1,209 +1,135 @@
// This file is part of Hangfire.
// Copyright � 2013-2014 Sergey Odinokov.
//
// Hangfire is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as
// published by the Free Software Foundation, either version 3
// of the License, or any later version.
//
// Hangfire is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public
// License along with Hangfire. If not, see <http://www.gnu.org/licenses/>.

using System;
using System.Linq;
using System.Linq;
using System.Collections.Generic;
using System.Data;
using System.Threading;
using Hangfire.Annotations;
using HangFire.Raven;
using Hangfire.Raven.Entities;
using Hangfire.Raven.Storage;
using System;
using Raven.Json.Linq;

namespace Hangfire.Raven.DistributedLock
namespace Hangfire.Raven.DistributedLocks
{
public class RavenDistributedLock : IDisposable
{
private const string LockMode = "Exclusive";
private const string LockOwner = "Session";
private const int CommandTimeoutAdditionSeconds = 1;

private static readonly IDictionary<int, string> LockErrorMessages = new Dictionary<int, string>
{
{ -1, "The lock request timed out" },
{ -2, "The lock request was canceled" },
{ -3, "The lock request was chosen as a deadlock victim" },
{ -999, "Indicates a parameter validation or other call error" }
};

private static readonly ThreadLocal<Dictionary<string, int>> AcquiredLocks = new ThreadLocal<Dictionary<string, int>>(() => new Dictionary<string, int>());
private readonly object _lock = new object();

private RavenStorage _storage;
private string _resource;
private RavenStorageOptions _options;

private Timer _heartbeatTimer = null;
private DistributedLock _distributedLock;
private Timer _heartbeatTimer;
private TimeSpan _timeout;

private bool _completed;
private readonly List<string> _skipLocks = new List<string>()
{
"HangFire/job:"
};

public RavenDistributedLock([NotNull] RavenStorage storage, [NotNull] string resource, TimeSpan timeout, RavenStorageOptions options)
public RavenDistributedLock([NotNull] RavenStorage storage, [NotNull] string resource, TimeSpan timeout)
{
storage.ThrowIfNull("storage");
resource.ThrowIfNull("resource");

if ((timeout.TotalSeconds + CommandTimeoutAdditionSeconds) > int.MaxValue) {
throw new ArgumentException(string.Format("The timeout specified is too large. Please supply a timeout equal to or less than {0} seconds",
int.MaxValue - CommandTimeoutAdditionSeconds),
"timeout");
}

_timeout = timeout;
_storage = storage;
_resource = resource;
_options = options;

if (!AcquiredLocks.Value.ContainsKey(_resource)) {
Acquire(_resource, timeout);
AcquiredLocks.Value[_resource] = 1;
} else {
AcquiredLocks.Value[_resource]++;
}
// -- Skip some locks
if(!_skipLocks.Any(a => _resource.StartsWith(a)))
Lock();
}

public void Dispose()
{
if (_completed)
return;

_completed = true;

if (!AcquiredLocks.Value.ContainsKey(_resource))
return;

AcquiredLocks.Value[_resource]--;

if (AcquiredLocks.Value[_resource] != 0)
return;

Release(_resource);
AcquiredLocks.Value.Remove(_resource);
Release();
}

internal void Acquire(string resource, TimeSpan timeout)
private void Lock()
{
try {
RemoveDeadLocks(resource);

// Check lock
DateTime lockTimeoutTime = DateTime.Now.Add(timeout);
bool isLockedBySomeoneElse;
bool isFirstAttempt = true;
do {
using (var repository = new Repository()) {
isLockedBySomeoneElse = repository.Session.Query<DistributedLocks>()
.FirstOrDefault(t => t.Resource == resource && t.ClientId != _options.ClientId) != null;
}

if (isFirstAttempt == true) {
isFirstAttempt = false;
} else {
Thread.Sleep((int)timeout.TotalMilliseconds / 10);
}
}
while ((isLockedBySomeoneElse == true) && (lockTimeoutTime >= DateTime.Now));

// Set lock
if (isLockedBySomeoneElse == false) {
using (var repository = new Repository()) {
var distributedLocks = repository.Session.Query<DistributedLocks>().Where(t => t.Resource == resource).ToList();

if (!distributedLocks.Any()) {
distributedLocks.Add(new DistributedLocks
{
Resource = resource,
});
}

foreach (var distributedLock in distributedLocks) {
distributedLock.ClientId = _options.ClientId;
distributedLock.LockCount = 1;
distributedLock.Heartbeat = DateTime.UtcNow;

repository.Save(distributedLock);
}
}

StartHeartBeat(_resource);
} else {
throw new RavenDistributedLockException(string.Format("Could not place a lock on the resource '{0}': {1}.", resource, "The lock request timed out"));
using (var session = _storage.Repository.OpenSession())
{
_distributedLock = new DistributedLock()
{
ClientId = _storage.Options.ClientId,
Resource = _resource
};

session.Store(_distributedLock);
session.Advanced.AddExpire(_distributedLock, DateTime.UtcNow + _timeout);

try
{
// Blocking session!
session.Advanced.UseOptimisticConcurrency = true;
session.SaveChanges();
}
} catch (Exception ex) {
if (ex is RavenDistributedLockException) {
throw;
} else {
throw new RavenDistributedLockException(string.Format("Could not place a lock on the resource '{0}': {1}.", resource, "Check inner exception for details"), ex);
catch (Exception e)
{
_distributedLock = null;
throw new RavenDistributedLockException("Lock already given.", e);
}

Heartbeat();
}
}

internal void Release(string resource)
private void Release()
{
try {
RemoveDeadLocks(resource);

// Remove resource lock
using (var repository = new Repository()) {
var distributedLocks = repository.Session.Query<DistributedLocks>().Where(t => t.Resource == _resource && t.ClientId == _options.ClientId).ToList();

foreach (var distributedLock in distributedLocks) {
repository.Delete(distributedLock);
lock (_lock)
{
if (_distributedLock != null)
{
// Non blocking session!
try
{
using (var session = _storage.Repository.OpenSession())
{
session.Delete(_distributedLock.Id);
session.SaveChanges();
}
}
catch
{
Console.WriteLine("Unable to delete lock: {0}", _resource);
}

_distributedLock = null;
}

if (_heartbeatTimer != null) {
// Stop timer
if (_heartbeatTimer != null)
{
_heartbeatTimer.Dispose();
_heartbeatTimer = null;
}
} catch (Exception ex) {
throw new RavenDistributedLockException(string.Format("Could not release a lock on the resource '{0}': {1}.", _resource, "Check inner exception for details"), ex);
}
}

private void StartHeartBeat(string resource)
private void Heartbeat()
{
Console.WriteLine("Starting heartbeat for resource: " + resource);
TimeSpan timerInterval = TimeSpan.FromMilliseconds(_options.DistributedLockLifetime.TotalMilliseconds / 5);
Console.WriteLine(".Starting heartbeat for resource: {0}", _resource);
TimeSpan timerInterval = TimeSpan.FromMilliseconds(_timeout.TotalMilliseconds / 3);

_heartbeatTimer = new Timer(state =>
{
try {
using (var repository = new Repository()) {
var distributedLocks = repository.Session.Query<DistributedLocks>().Where(t => t.Resource == resource && t.ClientId == _options.ClientId).ToList();

foreach (var distributedLock in distributedLocks) {
distributedLock.Heartbeat = DateTime.UtcNow;
repository.Save(distributedLock);
}
try
{
Console.WriteLine("..Heartbeat for resource {0}", _resource);
using (var session = _storage.Repository.OpenSession())
{
var distributedLock = session.Load<DistributedLock>(_distributedLock.Id);

session.Advanced.AddExpire(distributedLock, DateTime.UtcNow + _timeout);
session.SaveChanges();
}
} catch (Exception ex) {
Console.WriteLine("Unable to update heartbeat on the resource '{0}'", ex, resource);
}
}, null, timerInterval, timerInterval);
}

private void RemoveDeadLocks(string resource)
{
using (var repository = new Repository()) {
var heartBeat = DateTime.UtcNow.Subtract(_options.DistributedLockLifetime);
var deadLocks = repository.Session.Query<DistributedLocks>().Where(t => t.Resource == resource && t.Heartbeat < heartBeat).ToList();

foreach (var deadlock in deadLocks) {
repository.Delete(deadlock);
catch (Exception ex)
{
Console.WriteLine("...Unable to update heartbeat on the resource '{0}'", ex, _resource);
Release();
}
}
}, null, timerInterval, timerInterval);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
using System;

namespace Hangfire.Raven.DistributedLock
namespace Hangfire.Raven.DistributedLocks
{
/// <summary>
/// Represents exceptions for distributed lock implementation for MongoDB
Expand Down
3 changes: 0 additions & 3 deletions src/Hangfire.Raven/Entities/AggregatedCounter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,8 @@ namespace Hangfire.Raven.Entities
public class AggregatedCounter
{
public string Id { get; set; }

public string Key { get; set; }

public long Value { get; set; }

public DateTime? ExpireAt { get; set; }
}
}
9 changes: 3 additions & 6 deletions src/Hangfire.Raven/Entities/Counter.cs
Original file line number Diff line number Diff line change
@@ -1,18 +1,15 @@
using Hangfire.Raven.Entities.Identity;
using System;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace Hangfire.Raven.Entities
{
public class Counter : BaseEntity
public class Counter
{
public string Key { get; set; }
public string Id { get; set; }

public int Value { get; set; }

public DateTime? ExpireAt { get; set; }
}
}
Loading

0 comments on commit 343603f

Please sign in to comment.