00001
00002
00003
00004
00005
00006
00007
00008
00009
00010
00011 #include "buckets.h"
00012 #include "../data_structures/duplicatesRemover.h"
00013 #include "../utils/randomizer.h"
00014
00015
00020 template <NodeCount nodesPerBucket>
00021 void Buckets<nodesPerBucket>::initBuckets()
00022 {
00023
00024 DS_VERBOSE1( std::cout << std::endl << "create external buckets" << std::endl;
00025 Percent percent(_extBuckets.size()) );
00026
00027 _firstExtBucket = new InternalMemoryBucket(_noOfNodesInIntMem,0);
00028 for (BucketID i=0; i<_extBuckets.size(); i++) {
00029 DS_VERBOSE1( percent.printStatus(i) );
00030 _extBuckets[i] = new EdgesOfSeveralNodes( _prefetchPool, _writePool, 0 );
00031 }
00032
00033
00034
00035 DS_RANDOMIZE( Randomizer<RandomizerFeistel> randomizer(_graph.noOfNodes()) );
00036
00037 DS_VERBOSE1( std::cout << std::endl
00038 << "distribute edges to external buckets" << std::endl;
00039 percent.reinit(_graph.size(), true) );
00040
00041 for (; !_graph.empty(); _graph.pop_back()) {
00042 DS_DONT_RANDOMIZE( RelabeledEdge edge(_graph.back()) );
00043 DS_RANDOMIZE( RelabeledEdge edge = randomizer.randomize(_graph.back()) );
00044
00045 DS_VERBOSE1( percent.printStatus(_graph.size()) );
00046
00047 DS_VERBOSE2( std::cout << edge; )
00048
00049
00050
00051 if (edge.source() < edge.target()) edge.swap();
00052
00053 DS_VERBOSE2( std::cout << " swap " << edge
00054 << " bucketID: " << bucketID(edge.source())
00055 << " relabeled " );
00056
00057
00058 addToExternalBucket(edge);
00059
00060
00061 DS_VERBOSE2(
00062 if (bucketID(edge.source()) == -1)
00063 std::cout << _firstExtBucket->back() << std::endl;
00064 else
00065 std::cout << _extBuckets[bucketID(edge.source())]->top() << std::endl; )
00066 }
00067
00068
00069 delete &_graph;
00070
00071 DS_LOGGING1( logging.events().push_back(
00072 LoggingEvent("edges distributed to external buckets")) );
00073 }
00074
00075
00080 template <NodeCount nodesPerBucket>
00081 void Buckets<nodesPerBucket>::reduceNodes()
00082 {
00083
00084 DS_REMOVE_DUPLICATES(
00085 DuplicatesRemover< Buckets<nodesPerBucket> > *duplRem =
00086 new DuplicatesRemover< Buckets<nodesPerBucket> >(this);
00087 )
00088
00089 PoolEdgesOfOneNode *pool = new PoolEdgesOfOneNode();
00090 for (NodeCount i=0; i<nodesPerBucket; i++) {
00091 _intBuckets[i].setPool(pool);
00092 }
00093
00094
00095
00096 _firstNodeIDofCurrentBucket = _noOfNodesInIntMem
00097 + ( _extBuckets.size() * nodesPerBucket );
00098
00099
00100 for (BucketID currentExtBucketID=_extBuckets.size()-1; !_extBuckets.empty();
00101 currentExtBucketID--) {
00102
00103 _firstNodeIDofCurrentBucket -= nodesPerBucket;
00104
00105 EdgesOfSeveralNodes & currentExtBucket = *_extBuckets[currentExtBucketID];
00106
00107
00108 currentExtBucket.set_prefetch_aggr( DS_EXT_PREFETCH_POOL );
00109
00110 DS_LOGGING2( logging.buckets().push_back(
00111 LoggingBucket(currentExtBucketID, nodesPerBucket,
00112 currentExtBucket.size())) );
00113
00114
00115 DS_VERBOSE1( std::cout << std::endl
00116 << "process bucket " << currentExtBucketID << std::endl
00117 << " read ";
00118 Percent percent(currentExtBucket.size(), true, 20) );
00119
00120 for (; ! currentExtBucket.empty(); currentExtBucket.pop() ) {
00121 DS_VERBOSE1( percent.printStatus(currentExtBucket.size()) );
00122 _intBuckets[currentExtBucket.top().source() - _firstNodeIDofCurrentBucket]
00123 .push(currentExtBucket.top());
00124 }
00125
00126
00127 DS_LOGGING2( logging.buckets().back().setBlocksUsed(pool->noOfBlocksUsed());
00128 logging.buckets().back().setBlocksFree(pool->noOfBlocksFree()) );
00129
00130
00131
00132 DS_VERBOSE1( std::cout << std::endl << " relabel ";
00133 percent.reinit(nodesPerBucket, true, 20) );
00134
00135 for (NodeID currentNodeIndex = nodesPerBucket-1;
00136 currentNodeIndex < nodesPerBucket;
00137 currentNodeIndex--) {
00138
00139 EdgesOfOneNode ¤tIntBucket = _intBuckets[currentNodeIndex];
00140
00141 DS_LOGGING2( logging.buckets().back().
00142 addEdgesProcessed(currentIntBucket.size()) );
00143 DS_VERBOSE1( percent.printStatus(currentNodeIndex) );
00144 DS_VERBOSE2( std::cout << std::endl
00145 << "** currentNodeIndex = " << currentNodeIndex );
00146
00147 if ( currentIntBucket.empty() ) continue;
00148
00149
00150
00151 NodeID newSource = currentIntBucket.determineMinEdge(_result);
00152
00153
00154 for(; ! currentIntBucket.empty(); currentIntBucket.pop() ) {
00155
00156
00157
00158 if ( currentIntBucket.top().target() != newSource ) {
00159
00160 DS_DONT_REMOVE_DUPLICATES(
00161 add( RelabeledEdge(currentIntBucket.top(), newSource) )
00162 );
00163
00164 DS_REMOVE_DUPLICATES(
00165 duplRem->insert( currentIntBucket.top(), newSource )
00166 );
00167
00168 }
00169 }
00170
00171 DS_REMOVE_DUPLICATES( duplRem->clear(newSource) );
00172
00173 }
00174
00175 DS_LOGGING2(
00176 system("ps -C mst -o rss,%mem --no-headers | head -n 1 >> memoryLog.tmp") );
00177
00178
00179 delete _extBuckets[currentExtBucketID];
00180 _extBuckets.pop_back();
00181
00182
00183
00184 pool->increaseReserveMemory( DS_EXT_BLOCK_SIZE * DS_EXT_PAGE_SIZE );
00185
00186 }
00187
00188 DS_LOGGING2( logging.buckets().push_back(
00189 LoggingBucket(-1, _firstExtBucket->noOfNodes(),
00190 _firstExtBucket->noOfEdges())) );
00191
00192
00193 delete pool;
00194 DS_REMOVE_DUPLICATES( delete duplRem );
00195 }
00196
00197
00199 template <NodeCount nodesPerBucket>
00200 Buckets<nodesPerBucket>::BucketID
00201 Buckets<nodesPerBucket>::noOfExtBuckets(NodeCount noOfNodes, NodeCount noOfNodesInIntMem) {
00202 if (noOfNodes <= noOfNodesInIntMem) return 0;
00203
00204 NodeCount noOfNodesInExtMem = noOfNodes - noOfNodesInIntMem;
00205 BucketID noOfBuckets = noOfNodesInExtMem / nodesPerBucket;
00206 if (noOfNodesInExtMem % nodesPerBucket != 0) noOfBuckets++;
00207 noOfBuckets++;
00208
00209 return noOfBuckets;
00210 }