1: <?php
2: namespace Datadepo\Api\Synchronizers;
3: use Datadepo\Api;
4:
5:
6:
7: abstract class AbstractSynchronizer
8: {
9:
10:
11: protected $dataStore;
12:
13:
14: protected $iniConfiguration;
15:
16:
17: private $_wrapper;
18:
19: 20: 21: 22:
23: public function __construct(Api\DataStores\IDataStore $dataStore, Api\IniConfiguration $iniConfiguration)
24: {
25: $this->dataStore = $dataStore;
26: $this->iniConfiguration = $iniConfiguration;
27: }
28:
29: 30: 31:
32: abstract protected function makeSync();
33:
34: 35: 36:
37: abstract protected function wrapLine($line);
38:
39: 40: 41:
42: abstract protected function processChunk(Api\Collector $collector);
43:
44:
45: 46: 47:
48: public function sync()
49: {
50: try {
51: $running = $this->runningState();
52: $this->suspendedState();
53: $response = $this->makeSync();
54: $running->delete();
55: return $response;
56: }
57: catch (Api\DataDepoRunningException $ex) {
58: return new Api\DataDepoResponse(Api\DataDepoResponse::CODE_RUNNING);
59: }
60: catch (Api\DataDepoSuspendedException $ex) {
61: $running->delete();
62: return new Api\DataDepoResponse(Api\DataDepoResponse::CODE_SUSPENDED);
63: }
64: catch (\Exception $ex) {
65: if ($running !== NULL) {
66: $running->delete();
67: }
68: throw $ex;
69: }
70: }
71:
72: 73: 74:
75: public function getWrapper()
76: {
77: if ($this->_wrapper === NULL) {
78: $this->_wrapper = new Api\ApiWrapper($this->iniConfiguration->get('datadepo'), $this->iniConfiguration->get('account'));
79: }
80: return $this->_wrapper;
81: }
82:
83:
84:
85:
86: 87: 88:
89: protected function runningState()
90: {
91: $running = new Api\RunningFiles($this->iniConfiguration, $this->getRunningFileName());
92: if ($running->exists()) {
93: throw new Api\DataDepoRunningException;
94: }
95: $running->create();
96: return $running;
97: }
98:
99: 100: 101:
102: protected function getRunningFileName()
103: {
104: $rf = new \ReflectionClass(get_class($this));
105: return strtolower($rf->getShortName());
106: }
107:
108: 109: 110:
111: protected function suspendedState()
112: {
113: $config = $this->dataStore->getConfig('suspendedToTime');
114: if (isset($config['suspendedToTime']) && $config['suspendedToTime'] >= time()) {
115: throw new Api\DataDepoSuspendedException;
116: }
117: return FALSE;
118: }
119:
120:
121:
122:
123:
124: 125: 126: 127: 128:
129: protected function callSync($name, $configRaw, $limit)
130: {
131:
132: $config = array('last' => isset($configRaw[$name . '_last']) ? $configRaw[$name . '_last'] : 0,
133: 'rows' => isset($configRaw[$name . '_rows']) ? $configRaw[$name . '_rows'] : 0);
134:
135: $filePath = $this->iniConfiguration->getTempPath() . '/' . $name . '.data';
136: if ($config['rows'] == 0) {
137:
138: $response = $this->getWrapper()->request('data', $name, array('lastDownload' => $config['last']));
139:
140:
141: switch ($this->getWrapper()->analyzeResponse($response)) {
142: case 'url':
143:
144: $this->getWrapper()->download($response->url, $filePath);
145: break;
146: case 'suspend':
147:
148: return $this->suspend($response->until);
149: default:
150: throw new Api\ApiException('Response from datadepo is unknow');
151: }
152: }
153:
154:
155: $file = new \SplFileObject($filePath);
156: $header = json_decode($file->fgets(), TRUE);
157:
158:
159: if ($config['rows'] == 0) {
160: $file->seek(PHP_INT_MAX);
161: $linesTotal = $file->key()+1;
162: if ($header['numRecords'] != $linesTotal-1) {
163: throw new Api\ApiException('Incompleted file downloaded in ' . $name . ' synchronizer');
164: }
165: $file->rewind();
166: }
167:
168: $processCount = $this->iniConfiguration->get('limits', 'processCount');
169: $collector = $this->createCollector();
170: $count = 0;
171:
172:
173: $file->seek($config['rows']+1);
174: while ((!$file->eof() || $file->current() !== FALSE) && $count < $limit) {
175:
176: $line = $this->wrapLine($file->current());
177: $collector->add($line);
178: $count++;
179:
180:
181: if ($count % $processCount === 0) {
182: $this->processChunk($collector);
183: $collector = $this->createCollector();
184: }
185: $file->next();
186: }
187:
188:
189: if (count($collector) > 0) {
190: $this->processChunk($collector);
191: }
192:
193:
194: if ($count == $limit) {
195: $this->dataStore->setConfig($name . '_rows', $config['rows']+$count);
196: }
197: else {
198: $this->dataStore->setConfig($name . '_rows', 0);
199: $this->dataStore->setConfig($name . '_last', $header['generatedAt']);
200: }
201:
202: return new Api\DataDepoResponse(Api\DataDepoResponse::CODE_OK, NULL, array('processed' => $count));
203: }
204:
205: 206: 207:
208: protected function createCollector()
209: {
210: return new Api\Collector;
211: }
212:
213: 214: 215: 216:
217: protected function suspend($date)
218: {
219: $this->dataStore->setConfig('suspendedToTime', strtotime($date));
220: $this->getWrapper()->request('put', 'suspend', array('until' => $date));
221: return new Api\DataDepoResponse(Api\DataDepoResponse::CODE_SUSPENDED);
222: }
223:
224: }