aboutsummaryrefslogtreecommitdiffstats
path: root/app/workers
diff options
context:
space:
mode:
authorLuc Donnet2017-12-20 10:18:01 +0100
committerGitHub2017-12-20 10:18:01 +0100
commite6ab30bc089b9d30a4222df214a70097df651d72 (patch)
treeb62489c965045d0fa897f02caeeb0b59d436a062 /app/workers
parentc13540b1b10451c9b26045cbfcb5ec397d1ddbc0 (diff)
parentb4f0fe5ac25c1d58c7396f55fb66de7313783d9c (diff)
downloadchouette-core-e6ab30bc089b9d30a4222df214a70097df651d72.tar.bz2
Merge pull request #124 from af83/5006-wb_import_filter_refs_with_foreign_lines
5006 wb import filter refs with foreign lines
Diffstat (limited to 'app/workers')
-rw-r--r--app/workers/workbench_import_worker.rb92
-rw-r--r--app/workers/workbench_import_worker/object_state_updater.rb36
2 files changed, 87 insertions, 41 deletions
diff --git a/app/workers/workbench_import_worker.rb b/app/workers/workbench_import_worker.rb
index de51efded..6420be835 100644
--- a/app/workers/workbench_import_worker.rb
+++ b/app/workers/workbench_import_worker.rb
@@ -3,29 +3,25 @@ class WorkbenchImportWorker
include Rails.application.routes.url_helpers
include Configurable
+ include ObjectStateUpdater
+
+ attr_reader :entries, :workbench_import
+
# Workers
# =======
def perform(import_id)
- @workbench_import = WorkbenchImport.find(import_id)
- @response = nil
- @workbench_import.update(status: 'running', started_at: Time.now)
- downloaded = download
- zip_service = ZipService.new(downloaded)
+ @entries = 0
+ @workbench_import ||= WorkbenchImport.find(import_id)
+
+ workbench_import.update(status: 'running', started_at: Time.now)
+ zip_service = ZipService.new(downloaded, allowed_lines)
upload zip_service
- @workbench_import.update(ended_at: Time.now)
+ workbench_import.update(ended_at: Time.now)
rescue Zip::Error
handle_corrupt_zip_file
end
- def download
- logger.info "HTTP GET #{import_url}"
- HTTPService.get_resource(
- host: import_host,
- path: import_path,
- params: {token: @workbench_import.token_download}).body
- end
-
def execute_post eg_name, eg_file
logger.info "HTTP POST #{export_url} (for #{complete_entry_group_name(eg_name)})"
HTTPService.post_resource(
@@ -35,48 +31,43 @@ class WorkbenchImportWorker
end
def handle_corrupt_zip_file
- @workbench_import.messages.create(criticity: :error, message_key: 'corrupt_zip_file', message_attributes: {source_filename: @workbench_import.file.file.file})
+ workbench_import.messages.create(criticity: :error, message_key: 'corrupt_zip_file', message_attributes: {source_filename: workbench_import.file.file.file})
end
def upload zip_service
entry_group_streams = zip_service.subdirs
- @workbench_import.update total_steps: entry_group_streams.size
entry_group_streams.each_with_index(&method(:upload_entry_group))
+ workbench_import.update total_steps: @entries
rescue Exception => e
logger.error e.message
- @workbench_import.update( current_step: entry_group_streams.size, status: 'failed' )
+ workbench_import.update( current_step: @entries, status: 'failed' )
raise
end
- def update_object_state entry, count
- @workbench_import.update( current_step: count )
- unless entry.spurious.empty?
- @workbench_import.messages.create(
- criticity: :warning,
- message_key: 'inconsistent_zip_file',
- message_attributes: {
- 'source_filename' => @workbench_import.file.file.file,
- 'spurious_dirs' => entry.spurious.join(', ')
- })
- end
- end
def upload_entry_group entry, element_count
update_object_state entry, element_count.succ
+ return unless entry.ok?
# status = retry_service.execute(&upload_entry_group_proc(entry))
- eg_name = entry.name
- eg_stream = entry.stream
+ upload_entry_group_stream entry.name, entry.stream
+ end
+ def upload_entry_group_stream eg_name, eg_stream
FileUtils.mkdir_p(Rails.root.join('tmp', 'imports'))
- eg_file = File.new(Rails.root.join('tmp', 'imports', "WorkbenchImport_#{eg_name}_#{$$}.zip"), 'wb').tap do |file|
+ File.open(Rails.root.join('tmp', 'imports', "WorkbenchImport_#{eg_name}_#{$$}.zip"), 'wb') do |file|
eg_stream.rewind
file.write eg_stream.read
end
- eg_file.close
- eg_file = File.new(Rails.root.join('tmp', 'imports', "WorkbenchImport_#{eg_name}_#{$$}.zip"))
+
+ upload_entry_group_tmpfile eg_name, File.new(Rails.root.join('tmp', 'imports', "WorkbenchImport_#{eg_name}_#{$$}.zip"))
+ end
+
+ def upload_entry_group_tmpfile eg_name, eg_file
result = execute_post eg_name, eg_file
if result && result.status < 400
+ @entries += 1
+ workbench_import.update( current_step: @entries )
result
else
raise StopIteration, result.body
@@ -91,7 +82,7 @@ class WorkbenchImportWorker
# =======
def complete_entry_group_name entry_group_name
- [@workbench_import.name, entry_group_name].join("--")
+ [workbench_import.name, entry_group_name].join("--")
end
# Constants
@@ -111,7 +102,7 @@ class WorkbenchImportWorker
Rails.application.config.rails_host
end
def import_path
- @__import_path__ ||= download_workbench_import_path(@workbench_import.workbench, @workbench_import)
+ @__import_path__ ||= download_workbench_import_path(workbench_import.workbench, workbench_import)
end
def import_url
@__import_url__ ||= File.join(import_host, import_path)
@@ -119,10 +110,29 @@ class WorkbenchImportWorker
def params file, name
{ netex_import:
- { parent_id: @workbench_import.id,
- parent_type: @workbench_import.class.name,
- workbench_id: @workbench_import.workbench_id,
- name: name,
- file: HTTPService.upload(file, 'application/zip', "#{name}.zip") } }
+ { parent_id: workbench_import.id,
+ parent_type: workbench_import.class.name,
+ workbench_id: workbench_import.workbench_id,
+ name: name,
+ file: HTTPService.upload(file, 'application/zip', "#{name}.zip") } }
+ end
+
+ # Lazy Values
+ # ===========
+
+ def allowed_lines
+ @__allowed_lines__ ||= workbench_import.workbench.organisation.lines_set
end
+ def downloaded
+ @__downloaded__ ||= download_response.body
+ end
+ def download_response
+ @__download_response__ ||= HTTPService.get_resource(
+ host: import_host,
+ path: import_path,
+ params: {token: workbench_import.token_download}).tap do
+ logger.info "HTTP GET #{import_url}"
+ end
+ end
+
end
diff --git a/app/workers/workbench_import_worker/object_state_updater.rb b/app/workers/workbench_import_worker/object_state_updater.rb
new file mode 100644
index 000000000..e9cc081b7
--- /dev/null
+++ b/app/workers/workbench_import_worker/object_state_updater.rb
@@ -0,0 +1,36 @@
+
+class WorkbenchImportWorker
+ module ObjectStateUpdater
+
+ def update_object_state entry, count
+ workbench_import.update( total_steps: count )
+ update_spurious entry
+ update_foreign_lines entry
+ end
+
+
+ private
+
+ def update_foreign_lines entry
+ return if entry.foreign_lines.empty?
+ workbench_import.messages.create(
+ criticity: :warning,
+ message_key: 'foreign_lines_in_referential',
+ message_attributes: {
+ 'source_filename' => workbench_import.file.file.file,
+ 'foreign_lines' => entry.foreign_lines.join(', ')
+ })
+ end
+
+ def update_spurious entry
+ return if entry.spurious.empty?
+ workbench_import.messages.create(
+ criticity: :warning,
+ message_key: 'inconsistent_zip_file',
+ message_attributes: {
+ 'source_filename' => workbench_import.file.file.file,
+ 'spurious_dirs' => entry.spurious.join(', ')
+ })
+ end
+ end
+end