using System;
using System.Diagnostics;
using System.Collections.Generic;
using System.Threading;
using System.Linq;
#pragma warning disable 0420
public partial class LockFreeDictionary<K, V> : IDictionary<K, V>
where K : IEquatable<K>
where V : IEquatable<V>
{
internal enum TryResult { None = 0, Expired, Impossible, Ok };
public partial class Config
{
internal abstract partial class BucketList
{
/// Layout of the Int64 BucketHead:
///
/// cccccccc
/// qqqqqqqqqqqqqqqqqqqqqqqq| |eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee
/// +-----------------------+------+-------------------------------+
/// 3 3 2 1 0
/// FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210FEDCBA9876543210
///
/// q: sequence number, 0 - (2^27 - 1), in order to avoid ABA failures
/// c: number of entries in the chain
/// e: global index of the first entry in a chain of entries, 0 - (2^32 - 1)
///
internal partial struct BucketHead
{
BucketHead(BucketList bl, uint bx, Int64 v)
{
this.bl = bl;
this.bx = bx;
this.v = v;
}
internal BucketHead(BucketList bl, uint bx)
{
this.bl = bl;
this.bx = bx;
if (IntPtr.Size == 4)
this.v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0);
else
{
this.v = bl.rg[bx];
Thread.MemoryBarrier();
}
}
internal BucketHead(Config cfg, BucketList bl, uint bx, out EntryBlock.Entry e)
: this(bl, bx)
{
while (CapturedEntryCount > 0)
{
cfg.GetEntryHot(CapturedFirstEntryIndex, out e);
Int64 new_v;
if (IntPtr.Size == 4)
new_v = Interlocked.CompareExchange(ref bl.rg[bx], 0, 0);
else
{
new_v = bl.rg[bx];
Thread.MemoryBarrier();
}
if (this.v == new_v)
return;
this.v = new_v;
}
e = default(EntryBlock.Entry);
}
/// <summary>
/// bucket list that the target bucket resides in and to which it belongs
/// </summary>
readonly BucketList bl;
/// <summary>
/// index in the target BucketList from which the target was originally obtained and will be (possibly)
/// re-stored
/// </summary>
readonly uint bx;
/// <summary>
/// originally witnessed version of the target bucket. This BucketHead structure only allows the target
/// bucket to be modified if the proposed new value can be atomically exchanged with this value.
/// </summary>
readonly Int64 v;
internal const Int64 EntryCountMask = 0x000000FF00000000;
internal const Int64 SequenceIncrement = 0x0000010000000000;
internal const Int64 SequenceMask = unchecked((Int64)0xFFFFFF0000000000);
internal const Int64 FirstIndexMask = 0x00000000FFFFFFFF;
internal const Int32 EntriesPerBucketMax = 0x80;
internal const Int32 EntriesPerBucketMask = EntriesPerBucketMax - 1;
internal int CapturedFirstEntryIndex { get { return (int)v; } }
internal int CapturedEntryCount { get { return (int)(v >> 32) & EntriesPerBucketMask; } }
internal Config Config { get { return this.bl.cfg; } }
internal bool IsExpired
{
get
{
if (IntPtr.Size == 4)
return v != Interlocked.CompareExchange(ref bl.rg[bx], 0, 0);
bool b = v != bl.rg[bx];
Thread.MemoryBarrier();
return b;
}
}
internal bool TryGetValue(int gx, out V value)
{
bl.cfg.GetValueHot(gx, out value);
return !IsExpired;
}
internal bool TryGetKey(int gx, out K key)
{
bl.cfg.GetKeyHot(gx, out key);
return !IsExpired;
}
internal bool TryUpdate(int gx_first, int c_ent)
{
return Interlocked.CompareExchange(ref bl.rg[bx],
((v + SequenceIncrement) & SequenceMask) |
((Int64)c_ent << 32) | (UInt32)gx_first, v) == v;
}
internal void Invalidate()
{
/// try to effect our change, or get the current value
Int64 v_cur = Interlocked.CompareExchange(ref bl.rg[bx], v + SequenceIncrement, v);
/// we never have to try more than twice because if a change based on the new value isn't
/// accepted, we know that there must have been an invalidation since the time of this function
/// call which we can count for this function as well (invalidations are indistinguishable)
if (v_cur != v)
Interlocked.CompareExchange(ref bl.rg[bx], v_cur + SequenceIncrement, v_cur);
}
internal bool TryFindKey(uint h_in, K key, out int gx)
{
int c_ent = CapturedEntryCount;
gx = CapturedFirstEntryIndex;
for (int i = 0; i < c_ent; i++)
{
int gx_next;
uint h = bl.cfg.GetHashAndNextHot(gx, out gx_next);
if (h == h_in)
{
K k;
if (!TryGetKey(gx, out k))
return false;
if (k.Equals(key))
return true;
}
else if (IsExpired)
return false;
gx = gx_next;
}
gx = -1;
return true;
}
internal bool TryFindKey(uint h_in, ref K key, out int gx_found, out int i_found, out int gx_last)
{
gx_found = gx_last = i_found = -1;
int gx = CapturedFirstEntryIndex, c_ent = CapturedEntryCount;
for (int i = 0; i < c_ent; i++)
{
gx_last = gx;
uint h = bl.cfg.GetHashAndNextHot(gx_last, out gx);
if (h == h_in)
{
K k;
if (!TryGetKey(gx_last, out k))
return false;
if (k.Equals(key))
{
gx_found = gx_last;
i_found = i;
}
}
else if (IsExpired)
return false;
}
return true;
}
internal bool TryRemoveFirstItem()
{
int c = CapturedEntryCount;
return c > 0 && TryUpdate(--c == 0 ? -1 : bl.cfg.GetNextHot(CapturedFirstEntryIndex), c);
}
internal bool TryInsertFirstItem(ref int mtid, ref EntryBlock.Entry e)
{
Config cfg = bl.cfg;
int c = CapturedEntryCount;
if (c == EntriesPerBucketMask)
{
cfg.RequestBucketResize(ref mtid);
return false;
}
e.gx_next = c == 0 ? -1 : CapturedFirstEntryIndex;
int gx = cfg.GetUnusedIndex(ref mtid);
cfg.StoreEntryHot(gx, ref e);
if (TryUpdate(gx, c + 1))
return true;
cfg.ReleaseIndex(ref mtid, gx);
return false;
}
internal TryResult CanCarouselTo(int gx_last, int gx_target, ref BucketHead bh_new)
{
Debug.Assert(gx_last != -1);
int n = bl.cfg.TransactEntryNext(gx_last, CapturedFirstEntryIndex, -1);
if (n != -1 && n != CapturedFirstEntryIndex)
return TryResult.Impossible;
Int64 new_v = ((v + SequenceIncrement) & ~FirstIndexMask) | (UInt32)gx_target;
if (Interlocked.CompareExchange(ref bl.rg[bx], new_v, v) != v)
return TryResult.Expired;
bh_new = new BucketHead(bl, bx, new_v);
return TryResult.Ok;
}
internal bool TryReplaceFirst(ref int mtid, ref EntryBlock.Entry e)
{
int gx = bl.cfg.GetUnusedIndex(ref mtid);
bl.cfg.StoreEntryHot(gx, ref e);
if (TryUpdate(gx, CapturedEntryCount))
return true;
bl.cfg.ReleaseIndex(ref mtid, gx);
return false;
}
internal bool TryRotateToEnd(ref int mtid, int gx_last, int i_target, ref BucketHead bh_new)
{
Debug.Assert(gx_last != -1);
/// couldn't use any tricks. rotate one at a time
i_target = CapturedEntryCount - i_target - 1;
while (true)
{
if (!bh_new.RotateOnce(ref mtid, gx_last, ref bh_new))
return false;
if (--i_target == 0)
return true;
if (!bh_new.TryGetLast(out gx_last))
return false;
}
}
bool RotateOnce(ref int mtid, int gx_last, ref BucketHead bh_new)
{
Config cfg = bl.cfg;
EntryBlock.Entry e;
cfg.GetEntryHot(gx_last, out e);
e.gx_next = CapturedFirstEntryIndex;
int gx_new = cfg.GetUnusedIndex(ref mtid);
cfg.StoreEntryHot(gx_new, ref e);
Int64 new_v = ((v + SequenceIncrement) & ~FirstIndexMask) | (UInt32)gx_new;
if (Interlocked.CompareExchange(ref bl.rg[bx], new_v, v) != v)
{
cfg.ReleaseIndex(ref mtid, gx_new);
return false;
}
cfg.ReleaseIndex(ref mtid, gx_last);
bh_new = new BucketHead(bl, bx, new_v);
return true;
}
bool TryGetLast(out int gx_last)
{
int c_ent = CapturedEntryCount;
Debug.Assert(c_ent > 0);
gx_last = CapturedFirstEntryIndex;
for (int i = 0; i < c_ent - 1; i++)
{
gx_last = bl.cfg.GetNextHot(gx_last);
if (IsExpired)
return false;
}
return true;
}
};
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// <summary>
/// BucketList (abstract)
/// </summary>
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
internal abstract partial class BucketList
{
readonly protected Int64[] rg;
readonly protected Config cfg;
bool f_closed = false;
internal BucketList(Config cfg, Int64[] rg)
{
this.cfg = cfg;
this.rg = rg;
}
internal BucketList(Config cfg, uint c_entries) : this(cfg, new Int64[HashFriendly(c_entries)]) { }
internal bool IsCurrentForConfig { get { return cfg.m_buckets == this; } }
[DebuggerBrowsable(DebuggerBrowsableState.Never)]
internal uint BucketCount { get { return (uint)rg.Length; } }
internal virtual bool GetBucketHead(ref int mtid, uint hash, out BucketHead bh)
{
bh = new BucketHead(this, hash % (uint)rg.Length);
return cfg.m_buckets == this;
}
internal void InvalidateAllBuckets()
{
for (uint i = 0; !f_closed && i < rg.Length; i++)
new BucketHead(this, i).Invalidate();
if (!f_closed)
{
Thread.MemoryBarrier();
f_closed = true;
}
}
static protected uint HashFriendly(uint c)
{
int ix = Array.BinarySearch<uint>(some_primes, c);
if (ix < 0)
ix = ~ix;
if (ix < some_primes.Length)
return some_primes[ix];
/// requested size is bigger than the largest prime in the table
c |= 1;
while (c % 3 == 0 || c % 5 == 0 || c % 7 == 0)
c += 2;
return c;
}
static readonly uint[] some_primes = {
0x000007, 0x00000B, 0x00000D, 0x000011, 0x000017, 0x00001D, 0x000025, 0x00002F, 0x00003B, 0x000043,
0x00004F, 0x000061, 0x00007F, 0x000089, 0x0000A3, 0x0000C5, 0x000101, 0x000115, 0x000133, 0x00014B,
0x00018D, 0x000209, 0x00022D, 0x000269, 0x0002A1, 0x00031D, 0x000419, 0x00045D, 0x0004D5, 0x000551,
0x00063D, 0x000833, 0x0008BD, 0x0009AD, 0x000AA9, 0x000C83, 0x001069, 0x001181, 0x00135D, 0x00155F,
0x001915, 0x0020E3, 0x002303, 0x0026C3, 0x002AC5, 0x003235, 0x0041CB, 0x004609, 0x004D8D, 0x005597,
0x006475, 0x0083A7, 0x008C17, 0x009B1D, 0x00AB4D, 0x00C8ED, 0x010751, 0x01183D, 0x01363F, 0x0156A7,
0x0191DD, 0x020EB5, 0x02307B, 0x026C81, 0x02AD57, 0x0323BF, 0x041D73, 0x0460FD, 0x04D905, 0x055AB3,
0x064787, 0x083AFD, 0x08C201, 0x09B215, 0x0AB57B, 0x0C8F4D, 0x107603, 0x118411, 0x136441, 0x20EC13,
0x2DC6D1, 0x41D82F, 0x4C4B4B, 0x5B8D8B, 0x6ACFC3, 0x7A1209, 0x83B073, 0x90F575, 0x989693 };
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// <summary>
/// BucketListPrimary - a list of buckets used during normal operation
/// </summary>
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
internal partial class BucketListPrimary : BucketList
{
internal BucketListPrimary(Config cfg, uint c_entries) : base(cfg, c_entries) { }
internal BucketListPrimary(Config cfg, Int64[] rg) : base(cfg, rg) { }
/// <summary>
/// Initiate bucket array resizing and continue on to assist with that task
/// </summary>
internal void InitiateBucketResize(ref int mtid)
{
if (this.IsCurrentForConfig && this.cfg.IsCurrent)
{
BucketListResize blr = new BucketListResize(cfg, this, HashFriendly(BucketCount * 2));
cfg.ChangeBuckets(this, blr);
cfg.CheckAssist(ref mtid);
}
}
};
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
/// <summary>
///
/// </summary>
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
internal partial class BucketListResize : BucketList
{
readonly BucketListPrimary blp;
int m_c_helpers;
bool f_done = false;
internal BucketListResize(Config c, BucketListPrimary blp, uint new_size)
: base(c, new_size)
{
this.blp = blp;
this.m_c_helpers = 0;
}
/// <summary>
/// Conscript all callers to assist with the bucket resizing, no matter if it's already complete or not.
/// </summary>
internal override bool GetBucketHead(ref int mtid, uint hash, out BucketHead bh)
{
_check_assist(ref mtid);
return cfg.d.m_config.m_buckets.GetBucketHead(ref mtid, hash, out bh);
}
/// <summary>
/// Allow any number of helpers to cooperatively resize the bucket array
/// </summary>
internal void _check_assist(ref int mtid)
{
if (!this.IsCurrentForConfig)
return;
/// wait-free: everyone sets all closed flags until the operation is confirmed and we have a frozen count
blp.InvalidateAllBuckets();
uint i_helper = (uint)Interlocked.Increment(ref m_c_helpers) - 1;
uint c_buckets_old = blp.BucketCount;
uint n_chunk = BucketCount / (i_helper + 3);
uint i_chunk = 0;
if (i_helper == 0 && (cfg.d.m_options & Options.DebugOutput) > 0)
{
Console.WriteLine("initiated resizing from {0} to {1} ({2} items)", c_buckets_old, BucketCount, cfg.m_count);
Console.Out.Flush();
}
contention:
/// the starting point for the single-stepping loop advances by a different chunk amount for each helper
i_chunk = (i_chunk + n_chunk) % c_buckets_old;
/// single-stepping loop. Restarted when any kind of contention is detected
/// each helper must eventually confirm zero entries in all source buckets
for (uint bx = i_chunk, c = 0; !f_done && c < c_buckets_old; bx = (bx + 1) % c_buckets_old, c++)
{
/// loop until this bucket has zero entries, unless aborted first due to contention
BucketHead bh_old;
EntryBlock.Entry e;
while ((bh_old = new BucketHead(cfg, blp, bx, out e)).CapturedEntryCount > 0)
{
BucketHead bh_new = new BucketHead(this, e.hash % BucketCount);
int gx;
if (!bh_new.TryFindKey(e.hash, e.key, out gx))
goto contention;
if (gx != -1)
{
if (!bh_old.TryRemoveFirstItem())
goto contention;
cfg.ReleaseIndex(ref mtid, bh_old.CapturedFirstEntryIndex);
}
else if (!bh_new.TryInsertFirstItem(ref mtid, ref e))
goto contention;
}
}
if (!f_done)
{
Thread.MemoryBarrier();
f_done = true;
}
/// first one finished swaps out the swap set back to a primary set. Also, be sure not to swap into the latest config
/// because if the dictionary was emptied while we were at work, we don't want to destroy any new items
if (this.IsCurrentForConfig)
cfg.ChangeBuckets(this, new BucketListPrimary(cfg, this.rg));
if (i_helper == 0 && (cfg.d.m_options & Options.DebugOutput) > 0)
{
Console.WriteLine("done");
Console.Out.Flush();
}
}
};
};
};