我正在使用一段處理佇列訊息的代碼(使用公共交通)。許多訊息可以并行處理。所有訊息都創建或修改 ActiveDirectory 中的物件(在這種情況下)。所有物件都需要根據 AD 架構定義進行驗證。(雖然它與問題無關,但我想指出,我們有很多客戶在他們的 AD 架構中有自定義擴展)
檢索模式資訊是一項緩慢的操作。我想做1次然后快取它。但有許多并行處理訊息。許多訊息在第一個成功之前就開始獲取模式資訊。所以做了太多的作業。目前我用一個簡單的信號量解決了這個問題。請參閱下面的代碼。
但這不是一個好的解決方案,因為現在只有 1 個執行緒可以一直輸入此代碼。
我需要一些東西來鎖定每個物件的代碼 1 次并推遲其他請求,直到第一次檢索和快取完成。
什么樣的構造可以讓我做到這一點?
private static SemaphoreSlim _lock = new SemaphoreSlim(1, 1);
public ActiveDirectorySchemaObject? GetSchemaObjectFor(string objectClass)
{
//todo: create better solution
_lock.Wait();
try
{
if (_activeDirectorySchemaContainer.HasSchemaObjectFor(
_scopeContext.CustomerId, objectClass) == false)
{
_logger.LogInformation($"Getting and caching schema from AD "
$"for {objectClass}");
_activeDirectorySchemaContainer.SetSchemaObjectFor(
_scopeContext.CustomerId, objectClass,
GetSchemaFromActiveDirectory(objectClass));
}
}
finally
{
_lock.Release();
}
return _activeDirectorySchemaContainer.GetSchemaObjectFor(
_scopeContext.CustomerId, objectClass);
}
以下是問題的可能簡化。簡而言之。我正在尋找適當的構造來鎖定一段代碼,以針對輸入的每個變化進行并行訪問。
有評論提到了懶惰。我以前沒用過的東西。但是閱讀檔案我看到它將物件的初始化推遲到以后。也許我可以為此重構。但是看看目前的代碼,我似乎需要一個懶惰的“if”或一個懶惰的“函式”,但也許我過于復雜了。我發現思考并行編程常常讓我頭疼。
根據要求,包含 setschemafor 和其他功能的架構容器類代碼。到目前為止,感謝您提供的所有資訊。
public interface IActiveDirectorySchemaContainer
{
//Dictionary<string, Dictionary<string, JObject>> schemaStore { get; }
bool HasSchemaObjectFor(string customerId, string objectClass);
ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass);
void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schema);
}
public class ActiveDirectorySchemaContainer : IActiveDirectorySchemaContainer
{
private Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>> _schemaStore = new Dictionary<string, Dictionary<string, ActiveDirectorySchemaObject>>();
public bool HasSchemaObjectFor(string customerId, string objectClass)
{
if (!_schemaStore.ContainsKey(customerId))
return false;
if (!_schemaStore[customerId].ContainsKey(objectClass))
return false;
if (_schemaStore[customerId][objectClass] != null)
return true;
else
return false;
}
public ActiveDirectorySchemaObject GetSchemaObjectFor(string customerId, string objectClass)
{
return _schemaStore[customerId][objectClass];
}
public void SetSchemaObjectFor(string customerId, string objectClass, ActiveDirectorySchemaObject schemaObject)
{
if (HasSchemaObjectFor(customerId, objectClass))
{
_schemaStore[customerId][objectClass] = schemaObject;
}
else
{
if (!_schemaStore.ContainsKey(customerId))
{
_schemaStore.Add(customerId, new Dictionary<string, ActiveDirectorySchemaObject>());
}
if (!_schemaStore[customerId].ContainsKey(objectClass))
{
_schemaStore[customerId].Add(objectClass, schemaObject);
}
else
{
_schemaStore[customerId][objectClass] = schemaObject;
}
}
}
}
customerId是為多個客戶分離schema資訊,而容器是通過依賴注入作為單例提供的。每條訊息都可以有不同的 customerId 并同時處理。然而,我只想一次檢索模式資料。這種架構可能并不理想,但此時我不允許更改它。
public static IServiceCollection AddActiveDirectorySchemaService(
this IServiceCollection services)
{
services.AddScoped<IActiveDirectorySchemaService, ActiveDirectorySchemaService>();
services.AddSingleton<IActiveDirectorySchemaContainer, ActiveDirectorySchemaContainer>();
return services;
}
uj5u.com熱心網友回復:
以下是如何使用將ConcurrentDictionary<TKey,TValue>
物件Lazy<T>
作為值的 a ,以確保每個鍵的模式僅被初始化一次:
private readonly ConcurrentDictionary<(string CustomerId, string ObjectClass),
Lazy<Schema>> _cachedSchemas = new();
public Schema GetSchemaObjectFor(string objectClass)
{
var combinedKey = (_scopeContext.CustomerId, objectClass);
Lazy<Schema> lazySchema = _cachedSchemas.GetOrAdd(combinedKey, key =>
{
return new Lazy<Schema>(() =>
{
_logger.LogInformation($"Getting schema for {key}");
return GetSchemaFromActiveDirectory(key.ObjectClass);
});
});
return lazySchema.Value;
}
的關鍵ConcurrentDictionary<TKey,TValue>
是一個ValueTuple<string, string>
。第一個字串是客戶 ID,第二個是物件類。為這兩個字串的每個唯一組合創建一個新模式。
不幸的是,上述建議存在Lazy<T>
該類的一個主要缺陷:它關于錯誤處理的行為是不可配置的。因此,如果valueFactory
失敗,所有后續請求都Value
將收到快取錯誤。這種行為是快取系統的阻礙。幸運的是,有Lazy<T>
可用的替代實作展示了用于快取目的的正確行為,即valueFactory
如果失敗則重試。你可以在">這里找到至少三個健壯和緊湊的實作,包括我昨天發布的一個。
uj5u.com熱心網友回復:
一種相對簡單的方法是使用 aConcurrentDictionary
來保存已加載物件的快取。字典根據其鍵的哈希碼將專案劃分為桶,然后對于ConcurrentDictionary
,每個桶都有自己的鎖。使用這樣的字典將比您當前的方法提高效率。
為了避免影響 AD 控制器/資料庫/其他任何東西,我仍將使用信號量來確保一次只有一個執行緒可以請求模式。但是,這僅在字典還沒有條目時發生。
請注意,第一個選項或多或少是Theodor's answer的復雜版本,因此,如果這對您有用,則最好改用該答案。我的第二個選項可能會通過合并 Theodor 的答案來優化。
public class CachedSchemaContainer
{
private readonly SchemaRetriever _schemaRetriever;
private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);
public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
{
_schemaRetriever = schemaRetriever;
}
public Schema GetSchemaObjectFor(string objectClass)
{
Schema schema;
// try and retrieve the value
if (_schemaCache.TryGetValue(objectClass, out schema))
{
return schema;
}
// OK, we need to wait our turn and try to load it from the AD controller
_semaphoreSlim.Wait();
try
{
// There's no point requerying it the last holder of the lock retrieved it, so check again
if (_schemaCache.TryGetValue(objectClass, out schema))
{
return schema;
}
// Go and get the schema, add it to the dictionary, and then return it
schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
_schemaCache.TryAdd(objectClass, schema);
return schema;
}
finally
{
// release the semaphore
_semaphoreSlim.Release();
}
}
}
另一種可能的優化可能是快取對Schema
每個執行緒的物件的參考。這意味著在給定執行緒之前訪問過這個特定模式的情況下不需要鎖定。我們仍然有執行緒安全ConcurrentDictionary
的快取執行緒之間的值,但最終這將避免快取預熱/填充后的大量鎖定:
public class CachedSchemaContainer : IDisposable
{
private readonly ISchemaRetriever _schemaRetriever;
private readonly ConcurrentDictionary<string, Schema> _schemaCache = new ConcurrentDictionary<string, Schema>();
private readonly ThreadLocal<Dictionary<string, Schema>> _threadSchemaCache = new ThreadLocal<Dictionary<string, Schema>>(() => new Dictionary<string, Schema>());
private readonly SemaphoreSlim _semaphoreSlim = new SemaphoreSlim(1, 1);
public CachedSchemaContainer(ISchemaRetriever schemaRetriever)
{
_schemaRetriever = schemaRetriever;
}
public Schema GetSchemaObjectFor(string objectClass)
{
Schema schema;
// try and retrieve the value from the thread's cache
if (_threadSchemaCache.Value.TryGetValue(objectClass, out schema))
{
return schema;
}
// try and retrieve the value
if (_schemaCache.TryGetValue(objectClass, out schema))
{
// it was already cached in the shared dictionary, so let's add it to the thread's
_threadSchemaCache.Value[objectClass] = schema;
return schema;
}
// OK, we need to wait our turn and try to load it from the AD controller
_semaphoreSlim.Wait();
try
{
// There's no point requerying it the last holder of the lock retrieved it, so check again
if (_schemaCache.TryGetValue(objectClass, out schema))
{
// it was already cached in the shared dictionary, so let's add it to the thread's
_threadSchemaCache.Value[objectClass] = schema;
return schema;
}
// Go and get the schema, add it to the shared and thread local dictionaries, and then return it
schema = _schemaRetriever.GetSchemaObjectFor(1, objectClass);
_schemaCache.TryAdd(objectClass, schema);
_threadSchemaCache.Value[objectClass] = schema;
return schema;
}
finally
{
// release the semaphore
_semaphoreSlim.Release();
}
}
public void Dispose()
{
_threadSchemaCache.Dispose();
}
}
這些示例中使用的常見型別定義:
public interface ISchemaRetriever
{
Schema GetSchemaObjectFor(int customerId, string objectClass);
}
public class Schema
{
}
檔案鏈接:
- 并發字典
- 執行緒本地
注意:Schema
這里是一個參考型別(一個類),所以字典存盤了一個指向Schema
每個加載的公共物件的指標objectClass
。因此,如果一個執行緒對Schema
物件進行更改,則可能會破壞另一個執行緒,等等,除非Schema
物件本身也是執行緒安全的。如果你只是讀取值而不是改變Schema
物件,那么你應該不用擔心那里。
此外,正如 Theodor 指出的那樣,除非您打算async
在未來使用這種方法,否則您可能會放棄使用 aSemaphoreSlim
而只使用 simple lock (lockingObject) { }
。檔案
轉載請註明出處,本文鏈接:https://www.uj5u.com/houduan/491163.html